drasi_source_platform/lib.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Platform Source Plugin for Drasi
16//!
17//! This plugin consumes data change events from Redis Streams, which is the primary
18//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
19//! containing node and relation changes, as well as control events for query subscriptions.
20//!
21//! # Architecture
22//!
23//! The platform source connects to Redis as a consumer group member, enabling:
24//! - **At-least-once delivery**: Messages are acknowledged after processing
25//! - **Horizontal scaling**: Multiple consumers can share the workload
26//! - **Fault tolerance**: Unacknowledged messages are redelivered
27//!
28//! # Configuration
29//!
30//! | Field | Type | Default | Description |
31//! |-------|------|---------|-------------|
32//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
33//! | `stream_key` | string | *required* | Redis stream key to consume from |
34//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
35//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
36//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
37//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
38//!
39//! # Data Format
40//!
41//! The platform source expects CloudEvent-wrapped messages with a `data` array
42//! containing change events. Each event includes an operation type and payload.
43//!
44//! ## Node Insert
45//!
46//! ```json
47//! {
48//! "data": [{
49//! "op": "i",
50//! "payload": {
51//! "after": {
52//! "id": "user-123",
53//! "labels": ["User"],
54//! "properties": {
55//! "name": "Alice",
56//! "email": "alice@example.com"
57//! }
58//! },
59//! "source": {
60//! "db": "mydb",
61//! "table": "node",
62//! "ts_ns": 1699900000000000000
63//! }
64//! }
65//! }]
66//! }
67//! ```
68//!
69//! ## Node Update
70//!
71//! ```json
72//! {
73//! "data": [{
74//! "op": "u",
75//! "payload": {
76//! "after": {
77//! "id": "user-123",
78//! "labels": ["User"],
79//! "properties": { "name": "Alice Updated" }
80//! },
81//! "source": { "table": "node", "ts_ns": 1699900001000000000 }
82//! }
83//! }]
84//! }
85//! ```
86//!
87//! ## Node Delete
88//!
89//! ```json
90//! {
91//! "data": [{
92//! "op": "d",
93//! "payload": {
94//! "before": {
95//! "id": "user-123",
96//! "labels": ["User"],
97//! "properties": {}
98//! },
99//! "source": { "table": "node", "ts_ns": 1699900002000000000 }
100//! }
101//! }]
102//! }
103//! ```
104//!
105//! ## Relation Insert
106//!
107//! ```json
108//! {
109//! "data": [{
110//! "op": "i",
111//! "payload": {
112//! "after": {
113//! "id": "follows-1",
114//! "labels": ["FOLLOWS"],
115//! "startId": "user-123",
116//! "endId": "user-456",
117//! "properties": { "since": "2024-01-01" }
118//! },
119//! "source": { "table": "rel", "ts_ns": 1699900003000000000 }
120//! }
121//! }]
122//! }
123//! ```
124//!
125//! # Control Events
126//!
127//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
128//! Currently supported control types:
129//!
130//! - **SourceSubscription**: Query subscription management
131//!
132//! # Example Configuration (YAML)
133//!
134//! ```yaml
135//! source_type: platform
136//! properties:
137//! redis_url: "redis://localhost:6379"
138//! stream_key: "my-app-changes"
139//! consumer_group: "drasi-consumers"
140//! batch_size: 50
141//! block_ms: 10000
142//! ```
143//!
144//! # Usage Example
145//!
146//! ```rust,ignore
147//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
148//! use std::sync::Arc;
149//!
150//! let config = PlatformSourceBuilder::new()
151//! .with_redis_url("redis://localhost:6379")
152//! .with_stream_key("my-changes")
153//! .with_consumer_group("my-consumers")
154//! .build();
155//!
156//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
157//! drasi.add_source(source).await?;
158//! ```
159
160pub mod config;
161pub use config::PlatformSourceConfig;
162
163use anyhow::Result;
164use log::{debug, error, info, warn};
165use redis::streams::StreamReadReply;
166use serde_json::Value;
167use std::collections::HashMap;
168use std::sync::Arc;
169use std::time::Duration;
170use tokio::sync::RwLock;
171use tokio::task::JoinHandle;
172
173use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
174use drasi_lib::channels::{
175 ComponentEvent, ComponentEventSender, ComponentStatus, ComponentType, ControlOperation,
176 DispatchMode, SourceControl, SourceEvent, SourceEventWrapper, SubscriptionResponse,
177};
178use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
179use drasi_lib::sources::manager::convert_json_to_element_properties;
180use drasi_lib::Source;
181
182#[cfg(test)]
183mod tests;
184
185/// Configuration for the platform source
186#[derive(Debug, Clone)]
187struct PlatformConfig {
188 /// Redis connection URL
189 redis_url: String,
190 /// Redis stream key to read from
191 stream_key: String,
192 /// Consumer group name
193 consumer_group: String,
194 /// Consumer name (should be unique per instance)
195 consumer_name: String,
196 /// Number of events to read per XREADGROUP call
197 batch_size: usize,
198 /// Milliseconds to block waiting for events
199 block_ms: u64,
200 /// Stream position to start from (">" for new, "0" for all)
201 start_id: String,
202 /// Always recreate consumer group on startup (default: false)
203 /// If true, deletes and recreates the consumer group using start_id
204 /// If false, uses existing group position (ignores start_id if group exists)
205 always_create_consumer_group: bool,
206 /// Maximum connection retry attempts
207 max_retries: usize,
208 /// Delay between retries in milliseconds
209 retry_delay_ms: u64,
210}
211
212impl Default for PlatformConfig {
213 fn default() -> Self {
214 Self {
215 redis_url: String::new(),
216 stream_key: String::new(),
217 consumer_group: String::new(),
218 consumer_name: String::new(),
219 batch_size: 10,
220 block_ms: 5000,
221 start_id: ">".to_string(),
222 always_create_consumer_group: false,
223 max_retries: 3,
224 retry_delay_ms: 1000,
225 }
226 }
227}
228
229/// Platform source that reads events from Redis Streams.
230///
231/// This source connects to a Redis instance and consumes CloudEvent-wrapped
232/// messages from a stream using consumer groups. It supports both data events
233/// (node/relation changes) and control events (query subscriptions).
234///
235/// # Fields
236///
237/// - `base`: Common source functionality (dispatchers, status, lifecycle)
238/// - `config`: Platform-specific configuration (Redis connection, stream settings)
239pub struct PlatformSource {
240 /// Base source implementation providing common functionality
241 base: SourceBase,
242 /// Platform source configuration
243 config: PlatformSourceConfig,
244}
245
246/// Builder for creating [`PlatformSource`] instances.
247///
248/// Provides a fluent API for constructing platform sources
249/// with sensible defaults.
250///
251/// # Example
252///
253/// ```rust,ignore
254/// use drasi_source_platform::PlatformSource;
255///
256/// let source = PlatformSource::builder("my-platform-source")
257/// .with_redis_url("redis://localhost:6379")
258/// .with_stream_key("my-app-changes")
259/// .with_consumer_group("my-consumers")
260/// .with_batch_size(50)
261/// .build()?;
262/// ```
263pub struct PlatformSourceBuilder {
264 id: String,
265 redis_url: String,
266 stream_key: String,
267 consumer_group: Option<String>,
268 consumer_name: Option<String>,
269 batch_size: Option<usize>,
270 block_ms: Option<u64>,
271 dispatch_mode: Option<DispatchMode>,
272 dispatch_buffer_capacity: Option<usize>,
273 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
274 auto_start: bool,
275}
276
277impl PlatformSourceBuilder {
278 /// Create a new builder with the given ID and default values.
279 pub fn new(id: impl Into<String>) -> Self {
280 Self {
281 id: id.into(),
282 redis_url: String::new(),
283 stream_key: String::new(),
284 consumer_group: None,
285 consumer_name: None,
286 batch_size: None,
287 block_ms: None,
288 dispatch_mode: None,
289 dispatch_buffer_capacity: None,
290 bootstrap_provider: None,
291 auto_start: true,
292 }
293 }
294
295 /// Set the Redis connection URL.
296 ///
297 /// # Arguments
298 ///
299 /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
300 pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
301 self.redis_url = url.into();
302 self
303 }
304
305 /// Set the Redis stream key to consume from.
306 ///
307 /// # Arguments
308 ///
309 /// * `key` - Name of the Redis stream
310 pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
311 self.stream_key = key.into();
312 self
313 }
314
315 /// Set the consumer group name.
316 ///
317 /// # Arguments
318 ///
319 /// * `group` - Consumer group name (default: `"drasi-core"`)
320 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
321 self.consumer_group = Some(group.into());
322 self
323 }
324
325 /// Set a unique consumer name within the group.
326 ///
327 /// # Arguments
328 ///
329 /// * `name` - Unique consumer name (auto-generated if not specified)
330 pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
331 self.consumer_name = Some(name.into());
332 self
333 }
334
335 /// Set the batch size for reading events.
336 ///
337 /// # Arguments
338 ///
339 /// * `size` - Number of events to read per XREADGROUP call (default: 100)
340 pub fn with_batch_size(mut self, size: usize) -> Self {
341 self.batch_size = Some(size);
342 self
343 }
344
345 /// Set the block timeout for waiting on events.
346 ///
347 /// # Arguments
348 ///
349 /// * `ms` - Milliseconds to block waiting for events (default: 5000)
350 pub fn with_block_ms(mut self, ms: u64) -> Self {
351 self.block_ms = Some(ms);
352 self
353 }
354
355 /// Set the dispatch mode for this source
356 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
357 self.dispatch_mode = Some(mode);
358 self
359 }
360
361 /// Set the dispatch buffer capacity for this source
362 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
363 self.dispatch_buffer_capacity = Some(capacity);
364 self
365 }
366
367 /// Set the bootstrap provider for this source
368 pub fn with_bootstrap_provider(
369 mut self,
370 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
371 ) -> Self {
372 self.bootstrap_provider = Some(Box::new(provider));
373 self
374 }
375
376 /// Set whether this source should auto-start when DrasiLib starts.
377 ///
378 /// Default is `true`. Set to `false` if this source should only be
379 /// started manually via `start_source()`.
380 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
381 self.auto_start = auto_start;
382 self
383 }
384
385 /// Set the full configuration at once
386 pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
387 self.redis_url = config.redis_url;
388 self.stream_key = config.stream_key;
389 self.consumer_group = Some(config.consumer_group);
390 self.consumer_name = config.consumer_name;
391 self.batch_size = Some(config.batch_size);
392 self.block_ms = Some(config.block_ms);
393 self
394 }
395
396 /// Build the platform source.
397 ///
398 /// # Errors
399 ///
400 /// Returns an error if the source cannot be constructed.
401 pub fn build(self) -> Result<PlatformSource> {
402 let config = PlatformSourceConfig {
403 redis_url: self.redis_url,
404 stream_key: self.stream_key,
405 consumer_group: self
406 .consumer_group
407 .unwrap_or_else(|| "drasi-core".to_string()),
408 consumer_name: self.consumer_name,
409 batch_size: self.batch_size.unwrap_or(100),
410 block_ms: self.block_ms.unwrap_or(5000),
411 };
412
413 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
414 if let Some(mode) = self.dispatch_mode {
415 params = params.with_dispatch_mode(mode);
416 }
417 if let Some(capacity) = self.dispatch_buffer_capacity {
418 params = params.with_dispatch_buffer_capacity(capacity);
419 }
420 if let Some(provider) = self.bootstrap_provider {
421 params = params.with_bootstrap_provider(provider);
422 }
423
424 Ok(PlatformSource {
425 base: SourceBase::new(params)?,
426 config,
427 })
428 }
429}
430
431impl PlatformSource {
432 /// Create a builder for PlatformSource
433 ///
434 /// # Example
435 ///
436 /// ```rust,ignore
437 /// use drasi_source_platform::PlatformSource;
438 ///
439 /// let source = PlatformSource::builder("my-platform-source")
440 /// .with_redis_url("redis://localhost:6379")
441 /// .with_stream_key("my-changes")
442 /// .with_bootstrap_provider(my_provider)
443 /// .build()?;
444 /// ```
445 pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
446 PlatformSourceBuilder::new(id)
447 }
448
449 /// Create a new platform source.
450 ///
451 /// The event channel is automatically injected when the source is added
452 /// to DrasiLib via `add_source()`.
453 ///
454 /// # Arguments
455 ///
456 /// * `id` - Unique identifier for this source instance
457 /// * `config` - Platform source configuration
458 ///
459 /// # Returns
460 ///
461 /// A new `PlatformSource` instance, or an error if construction fails.
462 ///
463 /// # Errors
464 ///
465 /// Returns an error if the base source cannot be initialized.
466 ///
467 /// # Example
468 ///
469 /// ```rust,ignore
470 /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
471 ///
472 /// let config = PlatformSourceBuilder::new("my-platform-source")
473 /// .with_redis_url("redis://localhost:6379")
474 /// .with_stream_key("my-changes")
475 /// .build()?;
476 /// ```
477 pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
478 let id = id.into();
479 let params = SourceBaseParams::new(id);
480 Ok(Self {
481 base: SourceBase::new(params)?,
482 config,
483 })
484 }
485
486 /// Subscribe to source events (for testing)
487 ///
488 /// This method is intended for use in tests to receive events broadcast by the source.
489 /// In production, queries subscribe to sources through the SourceManager.
490 /// Parse configuration from properties
491 #[allow(dead_code)]
492 fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
493 // Extract required fields first
494 let redis_url = properties
495 .get("redis_url")
496 .and_then(|v| v.as_str())
497 .ok_or_else(|| {
498 anyhow::anyhow!(
499 "Configuration error: Missing required field 'redis_url'. \
500 Platform source requires a Redis connection URL"
501 )
502 })?
503 .to_string();
504
505 let stream_key = properties
506 .get("stream_key")
507 .and_then(|v| v.as_str())
508 .ok_or_else(|| {
509 anyhow::anyhow!(
510 "Configuration error: Missing required field 'stream_key'. \
511 Platform source requires a Redis Stream key to read from"
512 )
513 })?
514 .to_string();
515
516 let consumer_group = properties
517 .get("consumer_group")
518 .and_then(|v| v.as_str())
519 .ok_or_else(|| {
520 anyhow::anyhow!(
521 "Configuration error: Missing required field 'consumer_group'. \
522 Platform source requires a consumer group name"
523 )
524 })?
525 .to_string();
526
527 let consumer_name = properties
528 .get("consumer_name")
529 .and_then(|v| v.as_str())
530 .ok_or_else(|| {
531 anyhow::anyhow!(
532 "Configuration error: Missing required field 'consumer_name'. \
533 Platform source requires a unique consumer name"
534 )
535 })?
536 .to_string();
537
538 // Get defaults for optional field handling
539 let defaults = PlatformConfig::default();
540
541 // Build config with optional fields
542 let config = PlatformConfig {
543 redis_url,
544 stream_key,
545 consumer_group,
546 consumer_name,
547 batch_size: properties
548 .get("batch_size")
549 .and_then(|v| v.as_u64())
550 .map(|v| v as usize)
551 .unwrap_or(defaults.batch_size),
552 block_ms: properties
553 .get("block_ms")
554 .and_then(|v| v.as_u64())
555 .unwrap_or(defaults.block_ms),
556 start_id: properties
557 .get("start_id")
558 .and_then(|v| v.as_str())
559 .map(|s| s.to_string())
560 .unwrap_or(defaults.start_id),
561 always_create_consumer_group: properties
562 .get("always_create_consumer_group")
563 .and_then(|v| v.as_bool())
564 .unwrap_or(defaults.always_create_consumer_group),
565 max_retries: properties
566 .get("max_retries")
567 .and_then(|v| v.as_u64())
568 .map(|v| v as usize)
569 .unwrap_or(defaults.max_retries),
570 retry_delay_ms: properties
571 .get("retry_delay_ms")
572 .and_then(|v| v.as_u64())
573 .unwrap_or(defaults.retry_delay_ms),
574 };
575
576 // Validate
577 if config.redis_url.is_empty() {
578 return Err(anyhow::anyhow!(
579 "Validation error: redis_url cannot be empty. \
580 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
581 ));
582 }
583 if config.stream_key.is_empty() {
584 return Err(anyhow::anyhow!(
585 "Validation error: stream_key cannot be empty. \
586 Please specify the Redis Stream key to read from"
587 ));
588 }
589 if config.consumer_group.is_empty() {
590 return Err(anyhow::anyhow!(
591 "Validation error: consumer_group cannot be empty. \
592 Please specify a consumer group name for this source"
593 ));
594 }
595 if config.consumer_name.is_empty() {
596 return Err(anyhow::anyhow!(
597 "Validation error: consumer_name cannot be empty. \
598 Please specify a unique consumer name within the consumer group"
599 ));
600 }
601
602 Ok(config)
603 }
604
605 /// Connect to Redis with retry logic
606 async fn connect_with_retry(
607 redis_url: &str,
608 max_retries: usize,
609 retry_delay_ms: u64,
610 ) -> Result<redis::aio::MultiplexedConnection> {
611 let client = redis::Client::open(redis_url)?;
612 let mut delay = retry_delay_ms;
613
614 for attempt in 0..max_retries {
615 match client.get_multiplexed_async_connection().await {
616 Ok(conn) => {
617 info!("Successfully connected to Redis");
618 return Ok(conn);
619 }
620 Err(e) if attempt < max_retries - 1 => {
621 warn!(
622 "Redis connection failed (attempt {}/{}): {}",
623 attempt + 1,
624 max_retries,
625 e
626 );
627 tokio::time::sleep(Duration::from_millis(delay)).await;
628 delay *= 2; // Exponential backoff
629 }
630 Err(e) => {
631 return Err(anyhow::anyhow!(
632 "Failed to connect to Redis after {max_retries} attempts: {e}"
633 ));
634 }
635 }
636 }
637
638 unreachable!()
639 }
640
641 /// Create or recreate consumer group based on configuration
642 async fn create_consumer_group(
643 conn: &mut redis::aio::MultiplexedConnection,
644 stream_key: &str,
645 consumer_group: &str,
646 start_id: &str,
647 always_create: bool,
648 ) -> Result<()> {
649 // Determine the initial position for the consumer group
650 let group_start_id = if start_id == ">" {
651 "$" // ">" means only new messages, so create group at end
652 } else {
653 start_id // "0" or specific ID
654 };
655
656 // If always_create is true, delete the existing group first
657 if always_create {
658 info!(
659 "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
660 );
661
662 let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
663 .arg("DESTROY")
664 .arg(stream_key)
665 .arg(consumer_group)
666 .query_async(conn)
667 .await;
668
669 match destroy_result {
670 Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
671 Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
672 Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
673 Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
674 }
675 }
676
677 // Try to create the consumer group
678 let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
679 .arg("CREATE")
680 .arg(stream_key)
681 .arg(consumer_group)
682 .arg(group_start_id)
683 .arg("MKSTREAM")
684 .query_async(conn)
685 .await;
686
687 match result {
688 Ok(_) => {
689 info!(
690 "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
691 );
692 Ok(())
693 }
694 Err(e) => {
695 // BUSYGROUP error means the group already exists
696 if e.to_string().contains("BUSYGROUP") {
697 info!(
698 "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
699 );
700 Ok(())
701 } else {
702 Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
703 }
704 }
705 }
706 }
707
708 /// Start the stream consumer task
709 async fn start_consumer_task(
710 source_id: String,
711 platform_config: PlatformConfig,
712 dispatchers: Arc<
713 RwLock<
714 Vec<
715 Box<
716 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
717 >,
718 >,
719 >,
720 >,
721 event_tx: Arc<RwLock<Option<ComponentEventSender>>>,
722 status: Arc<RwLock<ComponentStatus>>,
723 ) -> JoinHandle<()> {
724 tokio::spawn(async move {
725 info!(
726 "Starting platform source consumer for source '{}' on stream '{}'",
727 source_id, platform_config.stream_key
728 );
729
730 // Connect to Redis
731 let mut conn = match Self::connect_with_retry(
732 &platform_config.redis_url,
733 platform_config.max_retries,
734 platform_config.retry_delay_ms,
735 )
736 .await
737 {
738 Ok(conn) => conn,
739 Err(e) => {
740 error!("Failed to connect to Redis: {e}");
741 if let Some(ref tx) = *event_tx.read().await {
742 let _ = tx
743 .send(ComponentEvent {
744 component_id: source_id.clone(),
745 component_type: ComponentType::Source,
746 status: ComponentStatus::Stopped,
747 timestamp: chrono::Utc::now(),
748 message: Some(format!("Failed to connect to Redis: {e}")),
749 })
750 .await;
751 }
752 *status.write().await = ComponentStatus::Stopped;
753 return;
754 }
755 };
756
757 // Create consumer group
758 if let Err(e) = Self::create_consumer_group(
759 &mut conn,
760 &platform_config.stream_key,
761 &platform_config.consumer_group,
762 &platform_config.start_id,
763 platform_config.always_create_consumer_group,
764 )
765 .await
766 {
767 error!("Failed to create consumer group: {e}");
768 if let Some(ref tx) = *event_tx.read().await {
769 let _ = tx
770 .send(ComponentEvent {
771 component_id: source_id.clone(),
772 component_type: ComponentType::Source,
773 status: ComponentStatus::Stopped,
774 timestamp: chrono::Utc::now(),
775 message: Some(format!("Failed to create consumer group: {e}")),
776 })
777 .await;
778 }
779 *status.write().await = ComponentStatus::Stopped;
780 return;
781 }
782
783 // Main consumer loop
784 loop {
785 // Read from stream using ">" to get next undelivered messages for this consumer group
786 let read_result: Result<StreamReadReply, redis::RedisError> =
787 redis::cmd("XREADGROUP")
788 .arg("GROUP")
789 .arg(&platform_config.consumer_group)
790 .arg(&platform_config.consumer_name)
791 .arg("COUNT")
792 .arg(platform_config.batch_size)
793 .arg("BLOCK")
794 .arg(platform_config.block_ms)
795 .arg("STREAMS")
796 .arg(&platform_config.stream_key)
797 .arg(">") // Always use ">" for consumer group reads
798 .query_async(&mut conn)
799 .await;
800
801 match read_result {
802 Ok(reply) => {
803 // Collect all stream IDs for batch acknowledgment
804 let mut all_stream_ids = Vec::new();
805
806 // Process each stream entry
807 for stream_key in reply.keys {
808 for stream_id in stream_key.ids {
809 debug!("Received event from stream: {}", stream_id.id);
810
811 // Store stream ID for batch acknowledgment
812 all_stream_ids.push(stream_id.id.clone());
813
814 // Extract event data
815 match extract_event_data(&stream_id.map) {
816 Ok(event_json) => {
817 // Parse JSON
818 match serde_json::from_str::<Value>(&event_json) {
819 Ok(cloud_event) => {
820 // Detect message type
821 let message_type =
822 detect_message_type(&cloud_event);
823
824 match message_type {
825 MessageType::Control(control_type) => {
826 // Handle control message
827 debug!(
828 "Detected control message of type: {control_type}"
829 );
830
831 match transform_control_event(
832 cloud_event,
833 &control_type,
834 ) {
835 Ok(control_events) => {
836 // Publish control events
837 for control_event in control_events
838 {
839 // Create profiling metadata with timestamps
840 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
841 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
842
843 let wrapper = SourceEventWrapper::with_profiling(
844 source_id.clone(),
845 SourceEvent::Control(control_event),
846 chrono::Utc::now(),
847 profiling,
848 );
849
850 // Dispatch via helper
851 if let Err(e) = SourceBase::dispatch_from_task(
852 dispatchers.clone(),
853 wrapper,
854 &source_id,
855 )
856 .await
857 {
858 debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
859 } else {
860 debug!(
861 "Published control event for stream {}",
862 stream_id.id
863 );
864 }
865 }
866 }
867 Err(e) => {
868 warn!(
869 "Failed to transform control event {}: {}",
870 stream_id.id, e
871 );
872 }
873 }
874 }
875 MessageType::Data => {
876 // Handle data message
877 match transform_platform_event(
878 cloud_event,
879 &source_id,
880 ) {
881 Ok(source_changes_with_timestamps) => {
882 // Publish source changes
883 for item in
884 source_changes_with_timestamps
885 {
886 // Create profiling metadata with timestamps
887 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
888 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
889
890 // Extract source_ns from SourceChange transaction time
891 profiling.source_ns = Some(
892 item.source_change
893 .get_transaction_time(),
894 );
895
896 // Set reactivator timestamps from event
897 profiling
898 .reactivator_start_ns =
899 item.reactivator_start_ns;
900 profiling.reactivator_end_ns =
901 item.reactivator_end_ns;
902
903 let wrapper = SourceEventWrapper::with_profiling(
904 source_id.clone(),
905 SourceEvent::Change(item.source_change),
906 chrono::Utc::now(),
907 profiling,
908 );
909
910 // Dispatch via helper
911 if let Err(e) = SourceBase::dispatch_from_task(
912 dispatchers.clone(),
913 wrapper,
914 &source_id,
915 )
916 .await
917 {
918 debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
919 } else {
920 debug!(
921 "Published source change for event {}",
922 stream_id.id
923 );
924 }
925 }
926 }
927 Err(e) => {
928 warn!(
929 "Failed to transform event {}: {}",
930 stream_id.id, e
931 );
932 if let Some(ref tx) =
933 *event_tx.read().await
934 {
935 let _ = tx
936 .send(ComponentEvent {
937 component_id: source_id.clone(),
938 component_type:
939 ComponentType::Source,
940 status: ComponentStatus::Running,
941 timestamp: chrono::Utc::now(),
942 message: Some(format!(
943 "Transformation error: {e}"
944 )),
945 })
946 .await;
947 }
948 }
949 }
950 }
951 }
952 }
953 Err(e) => {
954 warn!(
955 "Failed to parse JSON for event {}: {}",
956 stream_id.id, e
957 );
958 }
959 }
960 }
961 Err(e) => {
962 warn!(
963 "Failed to extract event data from {}: {}",
964 stream_id.id, e
965 );
966 }
967 }
968 }
969 }
970
971 // Batch acknowledge all messages at once
972 if !all_stream_ids.is_empty() {
973 debug!("Acknowledging batch of {} messages", all_stream_ids.len());
974
975 let mut cmd = redis::cmd("XACK");
976 cmd.arg(&platform_config.stream_key)
977 .arg(&platform_config.consumer_group);
978
979 // Add all stream IDs to the command
980 for stream_id in &all_stream_ids {
981 cmd.arg(stream_id);
982 }
983
984 match cmd.query_async::<_, i64>(&mut conn).await {
985 Ok(ack_count) => {
986 debug!("Successfully acknowledged {ack_count} messages");
987 if ack_count as usize != all_stream_ids.len() {
988 warn!(
989 "Acknowledged {} messages but expected {}",
990 ack_count,
991 all_stream_ids.len()
992 );
993 }
994 }
995 Err(e) => {
996 error!("Failed to acknowledge message batch: {e}");
997
998 // Fallback: Try acknowledging messages individually
999 warn!("Falling back to individual acknowledgments");
1000 for stream_id in &all_stream_ids {
1001 match redis::cmd("XACK")
1002 .arg(&platform_config.stream_key)
1003 .arg(&platform_config.consumer_group)
1004 .arg(stream_id)
1005 .query_async::<_, i64>(&mut conn)
1006 .await
1007 {
1008 Ok(_) => {
1009 debug!(
1010 "Individually acknowledged message {stream_id}"
1011 );
1012 }
1013 Err(e) => {
1014 error!(
1015 "Failed to individually acknowledge message {stream_id}: {e}"
1016 );
1017 }
1018 }
1019 }
1020 }
1021 }
1022 }
1023 }
1024 Err(e) => {
1025 // Check if it's a connection error
1026 if is_connection_error(&e) {
1027 error!("Redis connection lost: {e}");
1028 if let Some(ref tx) = *event_tx.read().await {
1029 let _ = tx
1030 .send(ComponentEvent {
1031 component_id: source_id.clone(),
1032 component_type: ComponentType::Source,
1033 status: ComponentStatus::Running,
1034 timestamp: chrono::Utc::now(),
1035 message: Some(format!("Redis connection lost: {e}")),
1036 })
1037 .await;
1038 }
1039
1040 // Try to reconnect
1041 match Self::connect_with_retry(
1042 &platform_config.redis_url,
1043 platform_config.max_retries,
1044 platform_config.retry_delay_ms,
1045 )
1046 .await
1047 {
1048 Ok(new_conn) => {
1049 conn = new_conn;
1050 info!("Reconnected to Redis");
1051 }
1052 Err(e) => {
1053 error!("Failed to reconnect to Redis: {e}");
1054 *status.write().await = ComponentStatus::Stopped;
1055 return;
1056 }
1057 }
1058 } else if !e.to_string().contains("timeout") {
1059 // Log non-timeout errors
1060 error!("Error reading from stream: {e}");
1061 }
1062 }
1063 }
1064 }
1065 })
1066 }
1067}
1068
1069#[async_trait::async_trait]
1070impl Source for PlatformSource {
1071 fn id(&self) -> &str {
1072 &self.base.id
1073 }
1074
1075 fn type_name(&self) -> &str {
1076 "platform"
1077 }
1078
1079 fn properties(&self) -> HashMap<String, serde_json::Value> {
1080 let mut props = HashMap::new();
1081 props.insert(
1082 "redis_url".to_string(),
1083 serde_json::Value::String(self.config.redis_url.clone()),
1084 );
1085 props.insert(
1086 "stream_key".to_string(),
1087 serde_json::Value::String(self.config.stream_key.clone()),
1088 );
1089 props.insert(
1090 "consumer_group".to_string(),
1091 serde_json::Value::String(self.config.consumer_group.clone()),
1092 );
1093 if let Some(ref consumer_name) = self.config.consumer_name {
1094 props.insert(
1095 "consumer_name".to_string(),
1096 serde_json::Value::String(consumer_name.clone()),
1097 );
1098 }
1099 props.insert(
1100 "batch_size".to_string(),
1101 serde_json::Value::Number(self.config.batch_size.into()),
1102 );
1103 props.insert(
1104 "block_ms".to_string(),
1105 serde_json::Value::Number(self.config.block_ms.into()),
1106 );
1107 props
1108 }
1109
1110 fn auto_start(&self) -> bool {
1111 self.base.get_auto_start()
1112 }
1113
1114 async fn start(&self) -> Result<()> {
1115 info!("Starting platform source: {}", self.base.id);
1116
1117 // Extract configuration from typed config
1118 let platform_config = PlatformConfig {
1119 redis_url: self.config.redis_url.clone(),
1120 stream_key: self.config.stream_key.clone(),
1121 consumer_group: self.config.consumer_group.clone(),
1122 consumer_name: self
1123 .config
1124 .consumer_name
1125 .clone()
1126 .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1127 batch_size: self.config.batch_size,
1128 block_ms: self.config.block_ms,
1129 start_id: ">".to_string(),
1130 always_create_consumer_group: false,
1131 max_retries: 5,
1132 retry_delay_ms: 1000,
1133 };
1134
1135 // Update status
1136 *self.base.status.write().await = ComponentStatus::Running;
1137
1138 // Start consumer task
1139 let task = Self::start_consumer_task(
1140 self.base.id.clone(),
1141 platform_config,
1142 self.base.dispatchers.clone(),
1143 self.base.status_tx(),
1144 self.base.status.clone(),
1145 )
1146 .await;
1147
1148 *self.base.task_handle.write().await = Some(task);
1149
1150 Ok(())
1151 }
1152
1153 async fn stop(&self) -> Result<()> {
1154 info!("Stopping platform source: {}", self.base.id);
1155
1156 // Cancel the task
1157 if let Some(handle) = self.base.task_handle.write().await.take() {
1158 handle.abort();
1159 }
1160
1161 // Update status
1162 *self.base.status.write().await = ComponentStatus::Stopped;
1163
1164 Ok(())
1165 }
1166
1167 async fn status(&self) -> ComponentStatus {
1168 self.base.status.read().await.clone()
1169 }
1170
1171 async fn subscribe(
1172 &self,
1173 settings: drasi_lib::config::SourceSubscriptionSettings,
1174 ) -> Result<SubscriptionResponse> {
1175 self.base
1176 .subscribe_with_bootstrap(&settings, "Platform")
1177 .await
1178 }
1179
1180 fn as_any(&self) -> &dyn std::any::Any {
1181 self
1182 }
1183
1184 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1185 self.base.initialize(context).await;
1186 }
1187
1188 async fn set_bootstrap_provider(
1189 &self,
1190 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1191 ) {
1192 self.base.set_bootstrap_provider(provider).await;
1193 }
1194}
1195
1196impl PlatformSource {
1197 /// Create a test subscription to this source (synchronous)
1198 ///
1199 /// This method delegates to SourceBase and is provided for convenience in tests.
1200 /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1201 pub fn test_subscribe(
1202 &self,
1203 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1204 self.base.test_subscribe()
1205 }
1206
1207 /// Create a test subscription to this source (async)
1208 ///
1209 /// This method delegates to SourceBase and is provided for convenience in async tests.
1210 /// Prefer this method over test_subscribe() in async contexts.
1211 pub async fn test_subscribe_async(
1212 &self,
1213 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1214 self.base
1215 .create_streaming_receiver()
1216 .await
1217 .expect("Failed to create test subscription")
1218 }
1219}
1220
1221/// Extract event data from Redis stream entry
1222fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1223 // Look for common field names
1224 for key in &["data", "event", "payload", "message"] {
1225 if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1226 return String::from_utf8(data.clone())
1227 .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1228 }
1229 }
1230
1231 Err(anyhow::anyhow!(
1232 "No event data found in stream entry. Available keys: {:?}",
1233 entry_map.keys().collect::<Vec<_>>()
1234 ))
1235}
1236
1237/// Check if error is a connection error
1238fn is_connection_error(e: &redis::RedisError) -> bool {
1239 e.is_connection_dropped()
1240 || e.is_io_error()
1241 || e.to_string().contains("connection")
1242 || e.to_string().contains("EOF")
1243}
1244
1245/// Message type discriminator
1246#[derive(Debug, Clone, PartialEq)]
1247enum MessageType {
1248 /// Control message with control type from source.table
1249 Control(String),
1250 /// Data message (node/relation change)
1251 Data,
1252}
1253
1254/// Detect message type based on payload.source.db field
1255///
1256/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1257/// Returns MessageType::Data for all other cases
1258fn detect_message_type(cloud_event: &Value) -> MessageType {
1259 // Extract data array and get first event to check message type
1260 let data_array = match cloud_event["data"].as_array() {
1261 Some(arr) if !arr.is_empty() => arr,
1262 _ => return MessageType::Data, // Default to data if no data array
1263 };
1264
1265 // Check the first event's source.db field
1266 let first_event = &data_array[0];
1267 let source_db = first_event["payload"]["source"]["db"]
1268 .as_str()
1269 .unwrap_or("");
1270
1271 // Case-insensitive comparison with "Drasi"
1272 if source_db.eq_ignore_ascii_case("drasi") {
1273 // Extract source.table to determine control type
1274 let control_type = first_event["payload"]["source"]["table"]
1275 .as_str()
1276 .unwrap_or("Unknown")
1277 .to_string();
1278 MessageType::Control(control_type)
1279 } else {
1280 MessageType::Data
1281 }
1282}
1283
1284/// Helper struct to hold SourceChange along with reactivator timestamps
1285#[derive(Debug)]
1286struct SourceChangeWithTimestamps {
1287 source_change: SourceChange,
1288 reactivator_start_ns: Option<u64>,
1289 reactivator_end_ns: Option<u64>,
1290}
1291
1292/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1293///
1294/// Handles events in CloudEvent format with a data array containing change events.
1295/// Each event in the data array has: op, payload.after/before, payload.source
1296fn transform_platform_event(
1297 cloud_event: Value,
1298 source_id: &str,
1299) -> Result<Vec<SourceChangeWithTimestamps>> {
1300 let mut source_changes = Vec::new();
1301
1302 // Extract the data array from CloudEvent wrapper
1303 let data_array = cloud_event["data"]
1304 .as_array()
1305 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1306
1307 // Process each event in the data array
1308 for event in data_array {
1309 // Extract reactivator timestamps from top-level event fields
1310 let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1311 let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1312
1313 // Extract operation type (op field: "i", "u", "d")
1314 let op = event["op"]
1315 .as_str()
1316 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1317
1318 // Extract payload
1319 let payload = &event["payload"];
1320 if payload.is_null() {
1321 return Err(anyhow::anyhow!("Missing 'payload' field"));
1322 }
1323
1324 // Extract element type from payload.source.table
1325 let element_type = payload["source"]["table"]
1326 .as_str()
1327 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1328
1329 // Get element data based on operation
1330 let element_data = match op {
1331 "i" | "u" => &payload["after"],
1332 "d" => &payload["before"],
1333 _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1334 };
1335
1336 if element_data.is_null() {
1337 return Err(anyhow::anyhow!(
1338 "Missing element data (after/before) for operation {op}"
1339 ));
1340 }
1341
1342 // Extract element ID
1343 let element_id = element_data["id"]
1344 .as_str()
1345 .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1346
1347 // Extract labels
1348 let labels_array = element_data["labels"]
1349 .as_array()
1350 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1351
1352 let labels: Vec<Arc<str>> = labels_array
1353 .iter()
1354 .filter_map(|v| v.as_str().map(Arc::from))
1355 .collect();
1356
1357 if labels.is_empty() {
1358 return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1359 }
1360
1361 // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1362 let ts_ns = payload["source"]["ts_ns"]
1363 .as_u64()
1364 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1365 let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1366
1367 // Build ElementMetadata
1368 let reference = ElementReference::new(source_id, element_id);
1369 let metadata = ElementMetadata {
1370 reference,
1371 labels: labels.into(),
1372 effective_from,
1373 };
1374
1375 // Handle delete operation (no properties needed)
1376 if op == "d" {
1377 source_changes.push(SourceChangeWithTimestamps {
1378 source_change: SourceChange::Delete { metadata },
1379 reactivator_start_ns,
1380 reactivator_end_ns,
1381 });
1382 continue;
1383 }
1384
1385 // Convert properties
1386 let properties_obj = element_data["properties"]
1387 .as_object()
1388 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1389
1390 let properties = convert_json_to_element_properties(properties_obj)?;
1391
1392 // Build element based on type
1393 let element = match element_type {
1394 "node" => Element::Node {
1395 metadata,
1396 properties,
1397 },
1398 "rel" | "relation" => {
1399 // Extract startId and endId
1400 let start_id = element_data["startId"]
1401 .as_str()
1402 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1403 let end_id = element_data["endId"]
1404 .as_str()
1405 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1406
1407 Element::Relation {
1408 metadata,
1409 properties,
1410 out_node: ElementReference::new(source_id, start_id),
1411 in_node: ElementReference::new(source_id, end_id),
1412 }
1413 }
1414 _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1415 };
1416
1417 // Build SourceChange
1418 let source_change = match op {
1419 "i" => SourceChange::Insert { element },
1420 "u" => SourceChange::Update { element },
1421 _ => unreachable!(),
1422 };
1423
1424 source_changes.push(SourceChangeWithTimestamps {
1425 source_change,
1426 reactivator_start_ns,
1427 reactivator_end_ns,
1428 });
1429 }
1430
1431 Ok(source_changes)
1432}
1433
1434/// Transform CloudEvent-wrapped control event to SourceControl(s)
1435///
1436/// Handles control messages from Query API service with source.db = "Drasi"
1437/// Currently supports "SourceSubscription" control type
1438fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1439 let mut control_events = Vec::new();
1440
1441 // Extract the data array from CloudEvent wrapper
1442 let data_array = cloud_event["data"]
1443 .as_array()
1444 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1445
1446 // Check if control type is supported
1447 if control_type != "SourceSubscription" {
1448 info!(
1449 "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1450 );
1451 return Ok(control_events); // Return empty vec for unknown types
1452 }
1453
1454 // Process each event in the data array
1455 for event in data_array {
1456 // Extract operation type (op field: "i", "u", "d")
1457 let op = event["op"]
1458 .as_str()
1459 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1460
1461 // Extract payload
1462 let payload = &event["payload"];
1463 if payload.is_null() {
1464 warn!("Missing 'payload' field in control event, skipping");
1465 continue;
1466 }
1467
1468 // Get data based on operation
1469 let control_data = match op {
1470 "i" | "u" => &payload["after"],
1471 "d" => &payload["before"],
1472 _ => {
1473 warn!("Unknown operation type in control event: {op}, skipping");
1474 continue;
1475 }
1476 };
1477
1478 if control_data.is_null() {
1479 warn!("Missing control data (after/before) for operation {op}, skipping");
1480 continue;
1481 }
1482
1483 // Extract required fields for SourceSubscription
1484 let query_id = match control_data["queryId"].as_str() {
1485 Some(id) => id.to_string(),
1486 None => {
1487 warn!("Missing required 'queryId' field in control event, skipping");
1488 continue;
1489 }
1490 };
1491
1492 let query_node_id = match control_data["queryNodeId"].as_str() {
1493 Some(id) => id.to_string(),
1494 None => {
1495 warn!("Missing required 'queryNodeId' field in control event, skipping");
1496 continue;
1497 }
1498 };
1499
1500 // Extract optional label arrays (default to empty if missing)
1501 let node_labels = control_data["nodeLabels"]
1502 .as_array()
1503 .map(|arr| {
1504 arr.iter()
1505 .filter_map(|v| v.as_str().map(String::from))
1506 .collect()
1507 })
1508 .unwrap_or_default();
1509
1510 let rel_labels = control_data["relLabels"]
1511 .as_array()
1512 .map(|arr| {
1513 arr.iter()
1514 .filter_map(|v| v.as_str().map(String::from))
1515 .collect()
1516 })
1517 .unwrap_or_default();
1518
1519 // Map operation to ControlOperation
1520 let operation = match op {
1521 "i" => ControlOperation::Insert,
1522 "u" => ControlOperation::Update,
1523 "d" => ControlOperation::Delete,
1524 _ => unreachable!(), // Already filtered above
1525 };
1526
1527 // Build SourceControl::Subscription
1528 let control_event = SourceControl::Subscription {
1529 query_id,
1530 query_node_id,
1531 node_labels,
1532 rel_labels,
1533 operation,
1534 };
1535
1536 control_events.push(control_event);
1537 }
1538
1539 Ok(control_events)
1540}