drasi_source_platform/lib.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Platform Source Plugin for Drasi
16//!
17//! This plugin consumes data change events from Redis Streams, which is the primary
18//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
19//! containing node and relation changes, as well as control events for query subscriptions.
20//!
21//! # Architecture
22//!
23//! The platform source connects to Redis as a consumer group member, enabling:
24//! - **At-least-once delivery**: Messages are acknowledged after processing
25//! - **Horizontal scaling**: Multiple consumers can share the workload
26//! - **Fault tolerance**: Unacknowledged messages are redelivered
27//!
28//! # Configuration
29//!
30//! | Field | Type | Default | Description |
31//! |-------|------|---------|-------------|
32//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
33//! | `stream_key` | string | *required* | Redis stream key to consume from |
34//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
35//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
36//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
37//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
38//!
39//! # Data Format
40//!
41//! The platform source expects CloudEvent-wrapped messages with a `data` array
42//! containing change events. Each event includes an operation type and payload.
43//!
44//! ## Node Insert
45//!
46//! ```json
47//! {
48//! "data": [{
49//! "op": "i",
50//! "payload": {
51//! "after": {
52//! "id": "user-123",
53//! "labels": ["User"],
54//! "properties": {
55//! "name": "Alice",
56//! "email": "alice@example.com"
57//! }
58//! },
59//! "source": {
60//! "db": "mydb",
61//! "table": "node",
62//! "ts_ns": 1699900000000000000
63//! }
64//! }
65//! }]
66//! }
67//! ```
68//!
69//! ## Node Update
70//!
71//! ```json
72//! {
73//! "data": [{
74//! "op": "u",
75//! "payload": {
76//! "after": {
77//! "id": "user-123",
78//! "labels": ["User"],
79//! "properties": { "name": "Alice Updated" }
80//! },
81//! "source": { "table": "node", "ts_ns": 1699900001000000000 }
82//! }
83//! }]
84//! }
85//! ```
86//!
87//! ## Node Delete
88//!
89//! ```json
90//! {
91//! "data": [{
92//! "op": "d",
93//! "payload": {
94//! "before": {
95//! "id": "user-123",
96//! "labels": ["User"],
97//! "properties": {}
98//! },
99//! "source": { "table": "node", "ts_ns": 1699900002000000000 }
100//! }
101//! }]
102//! }
103//! ```
104//!
105//! ## Relation Insert
106//!
107//! ```json
108//! {
109//! "data": [{
110//! "op": "i",
111//! "payload": {
112//! "after": {
113//! "id": "follows-1",
114//! "labels": ["FOLLOWS"],
115//! "startId": "user-123",
116//! "endId": "user-456",
117//! "properties": { "since": "2024-01-01" }
118//! },
119//! "source": { "table": "rel", "ts_ns": 1699900003000000000 }
120//! }
121//! }]
122//! }
123//! ```
124//!
125//! # Control Events
126//!
127//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
128//! Currently supported control types:
129//!
130//! - **SourceSubscription**: Query subscription management
131//!
132//! # Example Configuration (YAML)
133//!
134//! ```yaml
135//! source_type: platform
136//! properties:
137//! redis_url: "redis://localhost:6379"
138//! stream_key: "my-app-changes"
139//! consumer_group: "drasi-consumers"
140//! batch_size: 50
141//! block_ms: 10000
142//! ```
143//!
144//! # Usage Example
145//!
146//! ```rust,ignore
147//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
148//! use std::sync::Arc;
149//!
150//! let config = PlatformSourceBuilder::new()
151//! .with_redis_url("redis://localhost:6379")
152//! .with_stream_key("my-changes")
153//! .with_consumer_group("my-consumers")
154//! .build();
155//!
156//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
157//! drasi.add_source(source).await?;
158//! ```
159
160pub mod config;
161pub use config::PlatformSourceConfig;
162
163use anyhow::Result;
164use log::{debug, error, info, warn};
165use redis::streams::StreamReadReply;
166use serde_json::Value;
167use std::collections::HashMap;
168use std::sync::Arc;
169use std::time::Duration;
170use tokio::sync::RwLock;
171use tokio::task::JoinHandle;
172
173use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
174use drasi_lib::channels::{
175 ComponentEvent, ComponentEventSender, ComponentStatus, ComponentType, ControlOperation,
176 DispatchMode, SourceControl, SourceEvent, SourceEventWrapper, SubscriptionResponse,
177};
178use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
179use drasi_lib::sources::manager::convert_json_to_element_properties;
180use drasi_lib::Source;
181use tracing::Instrument;
182
183#[cfg(test)]
184mod tests;
185
186/// Configuration for the platform source
187#[derive(Debug, Clone)]
188struct PlatformConfig {
189 /// Redis connection URL
190 redis_url: String,
191 /// Redis stream key to read from
192 stream_key: String,
193 /// Consumer group name
194 consumer_group: String,
195 /// Consumer name (should be unique per instance)
196 consumer_name: String,
197 /// Number of events to read per XREADGROUP call
198 batch_size: usize,
199 /// Milliseconds to block waiting for events
200 block_ms: u64,
201 /// Stream position to start from (">" for new, "0" for all)
202 start_id: String,
203 /// Always recreate consumer group on startup (default: false)
204 /// If true, deletes and recreates the consumer group using start_id
205 /// If false, uses existing group position (ignores start_id if group exists)
206 always_create_consumer_group: bool,
207 /// Maximum connection retry attempts
208 max_retries: usize,
209 /// Delay between retries in milliseconds
210 retry_delay_ms: u64,
211}
212
213impl Default for PlatformConfig {
214 fn default() -> Self {
215 Self {
216 redis_url: String::new(),
217 stream_key: String::new(),
218 consumer_group: String::new(),
219 consumer_name: String::new(),
220 batch_size: 10,
221 block_ms: 5000,
222 start_id: ">".to_string(),
223 always_create_consumer_group: false,
224 max_retries: 3,
225 retry_delay_ms: 1000,
226 }
227 }
228}
229
230/// Platform source that reads events from Redis Streams.
231///
232/// This source connects to a Redis instance and consumes CloudEvent-wrapped
233/// messages from a stream using consumer groups. It supports both data events
234/// (node/relation changes) and control events (query subscriptions).
235///
236/// # Fields
237///
238/// - `base`: Common source functionality (dispatchers, status, lifecycle)
239/// - `config`: Platform-specific configuration (Redis connection, stream settings)
240pub struct PlatformSource {
241 /// Base source implementation providing common functionality
242 base: SourceBase,
243 /// Platform source configuration
244 config: PlatformSourceConfig,
245}
246
247/// Builder for creating [`PlatformSource`] instances.
248///
249/// Provides a fluent API for constructing platform sources
250/// with sensible defaults.
251///
252/// # Example
253///
254/// ```rust,ignore
255/// use drasi_source_platform::PlatformSource;
256///
257/// let source = PlatformSource::builder("my-platform-source")
258/// .with_redis_url("redis://localhost:6379")
259/// .with_stream_key("my-app-changes")
260/// .with_consumer_group("my-consumers")
261/// .with_batch_size(50)
262/// .build()?;
263/// ```
264pub struct PlatformSourceBuilder {
265 id: String,
266 redis_url: String,
267 stream_key: String,
268 consumer_group: Option<String>,
269 consumer_name: Option<String>,
270 batch_size: Option<usize>,
271 block_ms: Option<u64>,
272 dispatch_mode: Option<DispatchMode>,
273 dispatch_buffer_capacity: Option<usize>,
274 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
275 auto_start: bool,
276}
277
278impl PlatformSourceBuilder {
279 /// Create a new builder with the given ID and default values.
280 pub fn new(id: impl Into<String>) -> Self {
281 Self {
282 id: id.into(),
283 redis_url: String::new(),
284 stream_key: String::new(),
285 consumer_group: None,
286 consumer_name: None,
287 batch_size: None,
288 block_ms: None,
289 dispatch_mode: None,
290 dispatch_buffer_capacity: None,
291 bootstrap_provider: None,
292 auto_start: true,
293 }
294 }
295
296 /// Set the Redis connection URL.
297 ///
298 /// # Arguments
299 ///
300 /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
301 pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
302 self.redis_url = url.into();
303 self
304 }
305
306 /// Set the Redis stream key to consume from.
307 ///
308 /// # Arguments
309 ///
310 /// * `key` - Name of the Redis stream
311 pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
312 self.stream_key = key.into();
313 self
314 }
315
316 /// Set the consumer group name.
317 ///
318 /// # Arguments
319 ///
320 /// * `group` - Consumer group name (default: `"drasi-core"`)
321 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
322 self.consumer_group = Some(group.into());
323 self
324 }
325
326 /// Set a unique consumer name within the group.
327 ///
328 /// # Arguments
329 ///
330 /// * `name` - Unique consumer name (auto-generated if not specified)
331 pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
332 self.consumer_name = Some(name.into());
333 self
334 }
335
336 /// Set the batch size for reading events.
337 ///
338 /// # Arguments
339 ///
340 /// * `size` - Number of events to read per XREADGROUP call (default: 100)
341 pub fn with_batch_size(mut self, size: usize) -> Self {
342 self.batch_size = Some(size);
343 self
344 }
345
346 /// Set the block timeout for waiting on events.
347 ///
348 /// # Arguments
349 ///
350 /// * `ms` - Milliseconds to block waiting for events (default: 5000)
351 pub fn with_block_ms(mut self, ms: u64) -> Self {
352 self.block_ms = Some(ms);
353 self
354 }
355
356 /// Set the dispatch mode for this source
357 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
358 self.dispatch_mode = Some(mode);
359 self
360 }
361
362 /// Set the dispatch buffer capacity for this source
363 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
364 self.dispatch_buffer_capacity = Some(capacity);
365 self
366 }
367
368 /// Set the bootstrap provider for this source
369 pub fn with_bootstrap_provider(
370 mut self,
371 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
372 ) -> Self {
373 self.bootstrap_provider = Some(Box::new(provider));
374 self
375 }
376
377 /// Set whether this source should auto-start when DrasiLib starts.
378 ///
379 /// Default is `true`. Set to `false` if this source should only be
380 /// started manually via `start_source()`.
381 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
382 self.auto_start = auto_start;
383 self
384 }
385
386 /// Set the full configuration at once
387 pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
388 self.redis_url = config.redis_url;
389 self.stream_key = config.stream_key;
390 self.consumer_group = Some(config.consumer_group);
391 self.consumer_name = config.consumer_name;
392 self.batch_size = Some(config.batch_size);
393 self.block_ms = Some(config.block_ms);
394 self
395 }
396
397 /// Build the platform source.
398 ///
399 /// # Errors
400 ///
401 /// Returns an error if the source cannot be constructed.
402 pub fn build(self) -> Result<PlatformSource> {
403 let config = PlatformSourceConfig {
404 redis_url: self.redis_url,
405 stream_key: self.stream_key,
406 consumer_group: self
407 .consumer_group
408 .unwrap_or_else(|| "drasi-core".to_string()),
409 consumer_name: self.consumer_name,
410 batch_size: self.batch_size.unwrap_or(100),
411 block_ms: self.block_ms.unwrap_or(5000),
412 };
413
414 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
415 if let Some(mode) = self.dispatch_mode {
416 params = params.with_dispatch_mode(mode);
417 }
418 if let Some(capacity) = self.dispatch_buffer_capacity {
419 params = params.with_dispatch_buffer_capacity(capacity);
420 }
421 if let Some(provider) = self.bootstrap_provider {
422 params = params.with_bootstrap_provider(provider);
423 }
424
425 Ok(PlatformSource {
426 base: SourceBase::new(params)?,
427 config,
428 })
429 }
430}
431
432impl PlatformSource {
433 /// Create a builder for PlatformSource
434 ///
435 /// # Example
436 ///
437 /// ```rust,ignore
438 /// use drasi_source_platform::PlatformSource;
439 ///
440 /// let source = PlatformSource::builder("my-platform-source")
441 /// .with_redis_url("redis://localhost:6379")
442 /// .with_stream_key("my-changes")
443 /// .with_bootstrap_provider(my_provider)
444 /// .build()?;
445 /// ```
446 pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
447 PlatformSourceBuilder::new(id)
448 }
449
450 /// Create a new platform source.
451 ///
452 /// The event channel is automatically injected when the source is added
453 /// to DrasiLib via `add_source()`.
454 ///
455 /// # Arguments
456 ///
457 /// * `id` - Unique identifier for this source instance
458 /// * `config` - Platform source configuration
459 ///
460 /// # Returns
461 ///
462 /// A new `PlatformSource` instance, or an error if construction fails.
463 ///
464 /// # Errors
465 ///
466 /// Returns an error if the base source cannot be initialized.
467 ///
468 /// # Example
469 ///
470 /// ```rust,ignore
471 /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
472 ///
473 /// let config = PlatformSourceBuilder::new("my-platform-source")
474 /// .with_redis_url("redis://localhost:6379")
475 /// .with_stream_key("my-changes")
476 /// .build()?;
477 /// ```
478 pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
479 let id = id.into();
480 let params = SourceBaseParams::new(id);
481 Ok(Self {
482 base: SourceBase::new(params)?,
483 config,
484 })
485 }
486
487 /// Subscribe to source events (for testing)
488 ///
489 /// This method is intended for use in tests to receive events broadcast by the source.
490 /// In production, queries subscribe to sources through the SourceManager.
491 /// Parse configuration from properties
492 #[allow(dead_code)]
493 fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
494 // Extract required fields first
495 let redis_url = properties
496 .get("redis_url")
497 .and_then(|v| v.as_str())
498 .ok_or_else(|| {
499 anyhow::anyhow!(
500 "Configuration error: Missing required field 'redis_url'. \
501 Platform source requires a Redis connection URL"
502 )
503 })?
504 .to_string();
505
506 let stream_key = properties
507 .get("stream_key")
508 .and_then(|v| v.as_str())
509 .ok_or_else(|| {
510 anyhow::anyhow!(
511 "Configuration error: Missing required field 'stream_key'. \
512 Platform source requires a Redis Stream key to read from"
513 )
514 })?
515 .to_string();
516
517 let consumer_group = properties
518 .get("consumer_group")
519 .and_then(|v| v.as_str())
520 .ok_or_else(|| {
521 anyhow::anyhow!(
522 "Configuration error: Missing required field 'consumer_group'. \
523 Platform source requires a consumer group name"
524 )
525 })?
526 .to_string();
527
528 let consumer_name = properties
529 .get("consumer_name")
530 .and_then(|v| v.as_str())
531 .ok_or_else(|| {
532 anyhow::anyhow!(
533 "Configuration error: Missing required field 'consumer_name'. \
534 Platform source requires a unique consumer name"
535 )
536 })?
537 .to_string();
538
539 // Get defaults for optional field handling
540 let defaults = PlatformConfig::default();
541
542 // Build config with optional fields
543 let config = PlatformConfig {
544 redis_url,
545 stream_key,
546 consumer_group,
547 consumer_name,
548 batch_size: properties
549 .get("batch_size")
550 .and_then(|v| v.as_u64())
551 .map(|v| v as usize)
552 .unwrap_or(defaults.batch_size),
553 block_ms: properties
554 .get("block_ms")
555 .and_then(|v| v.as_u64())
556 .unwrap_or(defaults.block_ms),
557 start_id: properties
558 .get("start_id")
559 .and_then(|v| v.as_str())
560 .map(|s| s.to_string())
561 .unwrap_or(defaults.start_id),
562 always_create_consumer_group: properties
563 .get("always_create_consumer_group")
564 .and_then(|v| v.as_bool())
565 .unwrap_or(defaults.always_create_consumer_group),
566 max_retries: properties
567 .get("max_retries")
568 .and_then(|v| v.as_u64())
569 .map(|v| v as usize)
570 .unwrap_or(defaults.max_retries),
571 retry_delay_ms: properties
572 .get("retry_delay_ms")
573 .and_then(|v| v.as_u64())
574 .unwrap_or(defaults.retry_delay_ms),
575 };
576
577 // Validate
578 if config.redis_url.is_empty() {
579 return Err(anyhow::anyhow!(
580 "Validation error: redis_url cannot be empty. \
581 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
582 ));
583 }
584 if config.stream_key.is_empty() {
585 return Err(anyhow::anyhow!(
586 "Validation error: stream_key cannot be empty. \
587 Please specify the Redis Stream key to read from"
588 ));
589 }
590 if config.consumer_group.is_empty() {
591 return Err(anyhow::anyhow!(
592 "Validation error: consumer_group cannot be empty. \
593 Please specify a consumer group name for this source"
594 ));
595 }
596 if config.consumer_name.is_empty() {
597 return Err(anyhow::anyhow!(
598 "Validation error: consumer_name cannot be empty. \
599 Please specify a unique consumer name within the consumer group"
600 ));
601 }
602
603 Ok(config)
604 }
605
606 /// Connect to Redis with retry logic
607 async fn connect_with_retry(
608 redis_url: &str,
609 max_retries: usize,
610 retry_delay_ms: u64,
611 ) -> Result<redis::aio::MultiplexedConnection> {
612 let client = redis::Client::open(redis_url)?;
613 let mut delay = retry_delay_ms;
614
615 for attempt in 0..max_retries {
616 match client.get_multiplexed_async_connection().await {
617 Ok(conn) => {
618 info!("Successfully connected to Redis");
619 return Ok(conn);
620 }
621 Err(e) if attempt < max_retries - 1 => {
622 warn!(
623 "Redis connection failed (attempt {}/{}): {}",
624 attempt + 1,
625 max_retries,
626 e
627 );
628 tokio::time::sleep(Duration::from_millis(delay)).await;
629 delay *= 2; // Exponential backoff
630 }
631 Err(e) => {
632 return Err(anyhow::anyhow!(
633 "Failed to connect to Redis after {max_retries} attempts: {e}"
634 ));
635 }
636 }
637 }
638
639 unreachable!()
640 }
641
642 /// Create or recreate consumer group based on configuration
643 async fn create_consumer_group(
644 conn: &mut redis::aio::MultiplexedConnection,
645 stream_key: &str,
646 consumer_group: &str,
647 start_id: &str,
648 always_create: bool,
649 ) -> Result<()> {
650 // Determine the initial position for the consumer group
651 let group_start_id = if start_id == ">" {
652 "$" // ">" means only new messages, so create group at end
653 } else {
654 start_id // "0" or specific ID
655 };
656
657 // If always_create is true, delete the existing group first
658 if always_create {
659 info!(
660 "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
661 );
662
663 let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
664 .arg("DESTROY")
665 .arg(stream_key)
666 .arg(consumer_group)
667 .query_async(conn)
668 .await;
669
670 match destroy_result {
671 Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
672 Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
673 Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
674 Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
675 }
676 }
677
678 // Try to create the consumer group
679 let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
680 .arg("CREATE")
681 .arg(stream_key)
682 .arg(consumer_group)
683 .arg(group_start_id)
684 .arg("MKSTREAM")
685 .query_async(conn)
686 .await;
687
688 match result {
689 Ok(_) => {
690 info!(
691 "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
692 );
693 Ok(())
694 }
695 Err(e) => {
696 // BUSYGROUP error means the group already exists
697 if e.to_string().contains("BUSYGROUP") {
698 info!(
699 "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
700 );
701 Ok(())
702 } else {
703 Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
704 }
705 }
706 }
707 }
708
709 /// Start the stream consumer task
710 async fn start_consumer_task(
711 source_id: String,
712 instance_id: String,
713 platform_config: PlatformConfig,
714 dispatchers: Arc<
715 RwLock<
716 Vec<
717 Box<
718 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
719 >,
720 >,
721 >,
722 >,
723 event_tx: Arc<RwLock<Option<ComponentEventSender>>>,
724 status: Arc<RwLock<ComponentStatus>>,
725 ) -> JoinHandle<()> {
726 let source_id_for_span = source_id.clone();
727 let span = tracing::info_span!(
728 "platform_source_consumer",
729 instance_id = %instance_id,
730 component_id = %source_id_for_span,
731 component_type = "source"
732 );
733 tokio::spawn(async move {
734 info!(
735 "Starting platform source consumer for source '{}' on stream '{}'",
736 source_id, platform_config.stream_key
737 );
738
739 // Connect to Redis
740 let mut conn = match Self::connect_with_retry(
741 &platform_config.redis_url,
742 platform_config.max_retries,
743 platform_config.retry_delay_ms,
744 )
745 .await
746 {
747 Ok(conn) => conn,
748 Err(e) => {
749 error!("Failed to connect to Redis: {e}");
750 if let Some(ref tx) = *event_tx.read().await {
751 let _ = tx
752 .send(ComponentEvent {
753 component_id: source_id.clone(),
754 component_type: ComponentType::Source,
755 status: ComponentStatus::Stopped,
756 timestamp: chrono::Utc::now(),
757 message: Some(format!("Failed to connect to Redis: {e}")),
758 })
759 .await;
760 }
761 *status.write().await = ComponentStatus::Stopped;
762 return;
763 }
764 };
765
766 // Create consumer group
767 if let Err(e) = Self::create_consumer_group(
768 &mut conn,
769 &platform_config.stream_key,
770 &platform_config.consumer_group,
771 &platform_config.start_id,
772 platform_config.always_create_consumer_group,
773 )
774 .await
775 {
776 error!("Failed to create consumer group: {e}");
777 if let Some(ref tx) = *event_tx.read().await {
778 let _ = tx
779 .send(ComponentEvent {
780 component_id: source_id.clone(),
781 component_type: ComponentType::Source,
782 status: ComponentStatus::Stopped,
783 timestamp: chrono::Utc::now(),
784 message: Some(format!("Failed to create consumer group: {e}")),
785 })
786 .await;
787 }
788 *status.write().await = ComponentStatus::Stopped;
789 return;
790 }
791
792 // Main consumer loop
793 loop {
794 // Read from stream using ">" to get next undelivered messages for this consumer group
795 let read_result: Result<StreamReadReply, redis::RedisError> =
796 redis::cmd("XREADGROUP")
797 .arg("GROUP")
798 .arg(&platform_config.consumer_group)
799 .arg(&platform_config.consumer_name)
800 .arg("COUNT")
801 .arg(platform_config.batch_size)
802 .arg("BLOCK")
803 .arg(platform_config.block_ms)
804 .arg("STREAMS")
805 .arg(&platform_config.stream_key)
806 .arg(">") // Always use ">" for consumer group reads
807 .query_async(&mut conn)
808 .await;
809
810 match read_result {
811 Ok(reply) => {
812 // Collect all stream IDs for batch acknowledgment
813 let mut all_stream_ids = Vec::new();
814
815 // Process each stream entry
816 for stream_key in reply.keys {
817 for stream_id in stream_key.ids {
818 debug!("Received event from stream: {}", stream_id.id);
819
820 // Store stream ID for batch acknowledgment
821 all_stream_ids.push(stream_id.id.clone());
822
823 // Extract event data
824 match extract_event_data(&stream_id.map) {
825 Ok(event_json) => {
826 // Parse JSON
827 match serde_json::from_str::<Value>(&event_json) {
828 Ok(cloud_event) => {
829 // Detect message type
830 let message_type =
831 detect_message_type(&cloud_event);
832
833 match message_type {
834 MessageType::Control(control_type) => {
835 // Handle control message
836 debug!(
837 "Detected control message of type: {control_type}"
838 );
839
840 match transform_control_event(
841 cloud_event,
842 &control_type,
843 ) {
844 Ok(control_events) => {
845 // Publish control events
846 for control_event in control_events
847 {
848 // Create profiling metadata with timestamps
849 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
850 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
851
852 let wrapper = SourceEventWrapper::with_profiling(
853 source_id.clone(),
854 SourceEvent::Control(control_event),
855 chrono::Utc::now(),
856 profiling,
857 );
858
859 // Dispatch via helper
860 if let Err(e) = SourceBase::dispatch_from_task(
861 dispatchers.clone(),
862 wrapper,
863 &source_id,
864 )
865 .await
866 {
867 debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
868 } else {
869 debug!(
870 "Published control event for stream {}",
871 stream_id.id
872 );
873 }
874 }
875 }
876 Err(e) => {
877 warn!(
878 "Failed to transform control event {}: {}",
879 stream_id.id, e
880 );
881 }
882 }
883 }
884 MessageType::Data => {
885 // Handle data message
886 match transform_platform_event(
887 cloud_event,
888 &source_id,
889 ) {
890 Ok(source_changes_with_timestamps) => {
891 // Publish source changes
892 for item in
893 source_changes_with_timestamps
894 {
895 // Create profiling metadata with timestamps
896 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
897 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
898
899 // Extract source_ns from SourceChange transaction time
900 profiling.source_ns = Some(
901 item.source_change
902 .get_transaction_time(),
903 );
904
905 // Set reactivator timestamps from event
906 profiling
907 .reactivator_start_ns =
908 item.reactivator_start_ns;
909 profiling.reactivator_end_ns =
910 item.reactivator_end_ns;
911
912 let wrapper = SourceEventWrapper::with_profiling(
913 source_id.clone(),
914 SourceEvent::Change(item.source_change),
915 chrono::Utc::now(),
916 profiling,
917 );
918
919 // Dispatch via helper
920 if let Err(e) = SourceBase::dispatch_from_task(
921 dispatchers.clone(),
922 wrapper,
923 &source_id,
924 )
925 .await
926 {
927 debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
928 } else {
929 debug!(
930 "Published source change for event {}",
931 stream_id.id
932 );
933 }
934 }
935 }
936 Err(e) => {
937 warn!(
938 "Failed to transform event {}: {}",
939 stream_id.id, e
940 );
941 if let Some(ref tx) =
942 *event_tx.read().await
943 {
944 let _ = tx
945 .send(ComponentEvent {
946 component_id: source_id.clone(),
947 component_type:
948 ComponentType::Source,
949 status: ComponentStatus::Running,
950 timestamp: chrono::Utc::now(),
951 message: Some(format!(
952 "Transformation error: {e}"
953 )),
954 })
955 .await;
956 }
957 }
958 }
959 }
960 }
961 }
962 Err(e) => {
963 warn!(
964 "Failed to parse JSON for event {}: {}",
965 stream_id.id, e
966 );
967 }
968 }
969 }
970 Err(e) => {
971 warn!(
972 "Failed to extract event data from {}: {}",
973 stream_id.id, e
974 );
975 }
976 }
977 }
978 }
979
980 // Batch acknowledge all messages at once
981 if !all_stream_ids.is_empty() {
982 debug!("Acknowledging batch of {} messages", all_stream_ids.len());
983
984 let mut cmd = redis::cmd("XACK");
985 cmd.arg(&platform_config.stream_key)
986 .arg(&platform_config.consumer_group);
987
988 // Add all stream IDs to the command
989 for stream_id in &all_stream_ids {
990 cmd.arg(stream_id);
991 }
992
993 match cmd.query_async::<_, i64>(&mut conn).await {
994 Ok(ack_count) => {
995 debug!("Successfully acknowledged {ack_count} messages");
996 if ack_count as usize != all_stream_ids.len() {
997 warn!(
998 "Acknowledged {} messages but expected {}",
999 ack_count,
1000 all_stream_ids.len()
1001 );
1002 }
1003 }
1004 Err(e) => {
1005 error!("Failed to acknowledge message batch: {e}");
1006
1007 // Fallback: Try acknowledging messages individually
1008 warn!("Falling back to individual acknowledgments");
1009 for stream_id in &all_stream_ids {
1010 match redis::cmd("XACK")
1011 .arg(&platform_config.stream_key)
1012 .arg(&platform_config.consumer_group)
1013 .arg(stream_id)
1014 .query_async::<_, i64>(&mut conn)
1015 .await
1016 {
1017 Ok(_) => {
1018 debug!(
1019 "Individually acknowledged message {stream_id}"
1020 );
1021 }
1022 Err(e) => {
1023 error!(
1024 "Failed to individually acknowledge message {stream_id}: {e}"
1025 );
1026 }
1027 }
1028 }
1029 }
1030 }
1031 }
1032 }
1033 Err(e) => {
1034 // Check if it's a connection error
1035 if is_connection_error(&e) {
1036 error!("Redis connection lost: {e}");
1037 if let Some(ref tx) = *event_tx.read().await {
1038 let _ = tx
1039 .send(ComponentEvent {
1040 component_id: source_id.clone(),
1041 component_type: ComponentType::Source,
1042 status: ComponentStatus::Running,
1043 timestamp: chrono::Utc::now(),
1044 message: Some(format!("Redis connection lost: {e}")),
1045 })
1046 .await;
1047 }
1048
1049 // Try to reconnect
1050 match Self::connect_with_retry(
1051 &platform_config.redis_url,
1052 platform_config.max_retries,
1053 platform_config.retry_delay_ms,
1054 )
1055 .await
1056 {
1057 Ok(new_conn) => {
1058 conn = new_conn;
1059 info!("Reconnected to Redis");
1060 }
1061 Err(e) => {
1062 error!("Failed to reconnect to Redis: {e}");
1063 *status.write().await = ComponentStatus::Stopped;
1064 return;
1065 }
1066 }
1067 } else if !e.to_string().contains("timeout") {
1068 // Log non-timeout errors
1069 error!("Error reading from stream: {e}");
1070 }
1071 }
1072 }
1073 }
1074 }.instrument(span))
1075 }
1076}
1077
1078#[async_trait::async_trait]
1079impl Source for PlatformSource {
1080 fn id(&self) -> &str {
1081 &self.base.id
1082 }
1083
1084 fn type_name(&self) -> &str {
1085 "platform"
1086 }
1087
1088 fn properties(&self) -> HashMap<String, serde_json::Value> {
1089 let mut props = HashMap::new();
1090 props.insert(
1091 "redis_url".to_string(),
1092 serde_json::Value::String(self.config.redis_url.clone()),
1093 );
1094 props.insert(
1095 "stream_key".to_string(),
1096 serde_json::Value::String(self.config.stream_key.clone()),
1097 );
1098 props.insert(
1099 "consumer_group".to_string(),
1100 serde_json::Value::String(self.config.consumer_group.clone()),
1101 );
1102 if let Some(ref consumer_name) = self.config.consumer_name {
1103 props.insert(
1104 "consumer_name".to_string(),
1105 serde_json::Value::String(consumer_name.clone()),
1106 );
1107 }
1108 props.insert(
1109 "batch_size".to_string(),
1110 serde_json::Value::Number(self.config.batch_size.into()),
1111 );
1112 props.insert(
1113 "block_ms".to_string(),
1114 serde_json::Value::Number(self.config.block_ms.into()),
1115 );
1116 props
1117 }
1118
1119 fn auto_start(&self) -> bool {
1120 self.base.get_auto_start()
1121 }
1122
1123 async fn start(&self) -> Result<()> {
1124 info!("Starting platform source: {}", self.base.id);
1125
1126 // Extract configuration from typed config
1127 let platform_config = PlatformConfig {
1128 redis_url: self.config.redis_url.clone(),
1129 stream_key: self.config.stream_key.clone(),
1130 consumer_group: self.config.consumer_group.clone(),
1131 consumer_name: self
1132 .config
1133 .consumer_name
1134 .clone()
1135 .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1136 batch_size: self.config.batch_size,
1137 block_ms: self.config.block_ms,
1138 start_id: ">".to_string(),
1139 always_create_consumer_group: false,
1140 max_retries: 5,
1141 retry_delay_ms: 1000,
1142 };
1143
1144 // Update status
1145 *self.base.status.write().await = ComponentStatus::Running;
1146
1147 // Get instance_id from context for log routing isolation
1148 let instance_id = self
1149 .base
1150 .context()
1151 .await
1152 .map(|c| c.instance_id)
1153 .unwrap_or_default();
1154
1155 // Start consumer task
1156 let task = Self::start_consumer_task(
1157 self.base.id.clone(),
1158 instance_id,
1159 platform_config,
1160 self.base.dispatchers.clone(),
1161 self.base.status_tx(),
1162 self.base.status.clone(),
1163 )
1164 .await;
1165
1166 *self.base.task_handle.write().await = Some(task);
1167
1168 Ok(())
1169 }
1170
1171 async fn stop(&self) -> Result<()> {
1172 info!("Stopping platform source: {}", self.base.id);
1173
1174 // Cancel the task
1175 if let Some(handle) = self.base.task_handle.write().await.take() {
1176 handle.abort();
1177 }
1178
1179 // Update status
1180 *self.base.status.write().await = ComponentStatus::Stopped;
1181
1182 Ok(())
1183 }
1184
1185 async fn status(&self) -> ComponentStatus {
1186 self.base.status.read().await.clone()
1187 }
1188
1189 async fn subscribe(
1190 &self,
1191 settings: drasi_lib::config::SourceSubscriptionSettings,
1192 ) -> Result<SubscriptionResponse> {
1193 self.base
1194 .subscribe_with_bootstrap(&settings, "Platform")
1195 .await
1196 }
1197
1198 fn as_any(&self) -> &dyn std::any::Any {
1199 self
1200 }
1201
1202 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1203 self.base.initialize(context).await;
1204 }
1205
1206 async fn set_bootstrap_provider(
1207 &self,
1208 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1209 ) {
1210 self.base.set_bootstrap_provider(provider).await;
1211 }
1212}
1213
1214impl PlatformSource {
1215 /// Create a test subscription to this source (synchronous)
1216 ///
1217 /// This method delegates to SourceBase and is provided for convenience in tests.
1218 /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1219 pub fn test_subscribe(
1220 &self,
1221 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1222 self.base.test_subscribe()
1223 }
1224
1225 /// Create a test subscription to this source (async)
1226 ///
1227 /// This method delegates to SourceBase and is provided for convenience in async tests.
1228 /// Prefer this method over test_subscribe() in async contexts.
1229 pub async fn test_subscribe_async(
1230 &self,
1231 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1232 self.base
1233 .create_streaming_receiver()
1234 .await
1235 .expect("Failed to create test subscription")
1236 }
1237}
1238
1239/// Extract event data from Redis stream entry
1240fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1241 // Look for common field names
1242 for key in &["data", "event", "payload", "message"] {
1243 if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1244 return String::from_utf8(data.clone())
1245 .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1246 }
1247 }
1248
1249 Err(anyhow::anyhow!(
1250 "No event data found in stream entry. Available keys: {:?}",
1251 entry_map.keys().collect::<Vec<_>>()
1252 ))
1253}
1254
1255/// Check if error is a connection error
1256fn is_connection_error(e: &redis::RedisError) -> bool {
1257 e.is_connection_dropped()
1258 || e.is_io_error()
1259 || e.to_string().contains("connection")
1260 || e.to_string().contains("EOF")
1261}
1262
1263/// Message type discriminator
1264#[derive(Debug, Clone, PartialEq)]
1265enum MessageType {
1266 /// Control message with control type from source.table
1267 Control(String),
1268 /// Data message (node/relation change)
1269 Data,
1270}
1271
1272/// Detect message type based on payload.source.db field
1273///
1274/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1275/// Returns MessageType::Data for all other cases
1276fn detect_message_type(cloud_event: &Value) -> MessageType {
1277 // Extract data array and get first event to check message type
1278 let data_array = match cloud_event["data"].as_array() {
1279 Some(arr) if !arr.is_empty() => arr,
1280 _ => return MessageType::Data, // Default to data if no data array
1281 };
1282
1283 // Check the first event's source.db field
1284 let first_event = &data_array[0];
1285 let source_db = first_event["payload"]["source"]["db"]
1286 .as_str()
1287 .unwrap_or("");
1288
1289 // Case-insensitive comparison with "Drasi"
1290 if source_db.eq_ignore_ascii_case("drasi") {
1291 // Extract source.table to determine control type
1292 let control_type = first_event["payload"]["source"]["table"]
1293 .as_str()
1294 .unwrap_or("Unknown")
1295 .to_string();
1296 MessageType::Control(control_type)
1297 } else {
1298 MessageType::Data
1299 }
1300}
1301
1302/// Helper struct to hold SourceChange along with reactivator timestamps
1303#[derive(Debug)]
1304struct SourceChangeWithTimestamps {
1305 source_change: SourceChange,
1306 reactivator_start_ns: Option<u64>,
1307 reactivator_end_ns: Option<u64>,
1308}
1309
1310/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1311///
1312/// Handles events in CloudEvent format with a data array containing change events.
1313/// Each event in the data array has: op, payload.after/before, payload.source
1314fn transform_platform_event(
1315 cloud_event: Value,
1316 source_id: &str,
1317) -> Result<Vec<SourceChangeWithTimestamps>> {
1318 let mut source_changes = Vec::new();
1319
1320 // Extract the data array from CloudEvent wrapper
1321 let data_array = cloud_event["data"]
1322 .as_array()
1323 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1324
1325 // Process each event in the data array
1326 for event in data_array {
1327 // Extract reactivator timestamps from top-level event fields
1328 let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1329 let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1330
1331 // Extract operation type (op field: "i", "u", "d")
1332 let op = event["op"]
1333 .as_str()
1334 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1335
1336 // Extract payload
1337 let payload = &event["payload"];
1338 if payload.is_null() {
1339 return Err(anyhow::anyhow!("Missing 'payload' field"));
1340 }
1341
1342 // Extract element type from payload.source.table
1343 let element_type = payload["source"]["table"]
1344 .as_str()
1345 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1346
1347 // Get element data based on operation
1348 let element_data = match op {
1349 "i" | "u" => &payload["after"],
1350 "d" => &payload["before"],
1351 _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1352 };
1353
1354 if element_data.is_null() {
1355 return Err(anyhow::anyhow!(
1356 "Missing element data (after/before) for operation {op}"
1357 ));
1358 }
1359
1360 // Extract element ID
1361 let element_id = element_data["id"]
1362 .as_str()
1363 .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1364
1365 // Extract labels
1366 let labels_array = element_data["labels"]
1367 .as_array()
1368 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1369
1370 let labels: Vec<Arc<str>> = labels_array
1371 .iter()
1372 .filter_map(|v| v.as_str().map(Arc::from))
1373 .collect();
1374
1375 if labels.is_empty() {
1376 return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1377 }
1378
1379 // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1380 let ts_ns = payload["source"]["ts_ns"]
1381 .as_u64()
1382 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1383 let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1384
1385 // Build ElementMetadata
1386 let reference = ElementReference::new(source_id, element_id);
1387 let metadata = ElementMetadata {
1388 reference,
1389 labels: labels.into(),
1390 effective_from,
1391 };
1392
1393 // Handle delete operation (no properties needed)
1394 if op == "d" {
1395 source_changes.push(SourceChangeWithTimestamps {
1396 source_change: SourceChange::Delete { metadata },
1397 reactivator_start_ns,
1398 reactivator_end_ns,
1399 });
1400 continue;
1401 }
1402
1403 // Convert properties
1404 let properties_obj = element_data["properties"]
1405 .as_object()
1406 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1407
1408 let properties = convert_json_to_element_properties(properties_obj)?;
1409
1410 // Build element based on type
1411 let element = match element_type {
1412 "node" => Element::Node {
1413 metadata,
1414 properties,
1415 },
1416 "rel" | "relation" => {
1417 // Extract startId and endId
1418 let start_id = element_data["startId"]
1419 .as_str()
1420 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1421 let end_id = element_data["endId"]
1422 .as_str()
1423 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1424
1425 Element::Relation {
1426 metadata,
1427 properties,
1428 out_node: ElementReference::new(source_id, start_id),
1429 in_node: ElementReference::new(source_id, end_id),
1430 }
1431 }
1432 _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1433 };
1434
1435 // Build SourceChange
1436 let source_change = match op {
1437 "i" => SourceChange::Insert { element },
1438 "u" => SourceChange::Update { element },
1439 _ => unreachable!(),
1440 };
1441
1442 source_changes.push(SourceChangeWithTimestamps {
1443 source_change,
1444 reactivator_start_ns,
1445 reactivator_end_ns,
1446 });
1447 }
1448
1449 Ok(source_changes)
1450}
1451
1452/// Transform CloudEvent-wrapped control event to SourceControl(s)
1453///
1454/// Handles control messages from Query API service with source.db = "Drasi"
1455/// Currently supports "SourceSubscription" control type
1456fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1457 let mut control_events = Vec::new();
1458
1459 // Extract the data array from CloudEvent wrapper
1460 let data_array = cloud_event["data"]
1461 .as_array()
1462 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1463
1464 // Check if control type is supported
1465 if control_type != "SourceSubscription" {
1466 info!(
1467 "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1468 );
1469 return Ok(control_events); // Return empty vec for unknown types
1470 }
1471
1472 // Process each event in the data array
1473 for event in data_array {
1474 // Extract operation type (op field: "i", "u", "d")
1475 let op = event["op"]
1476 .as_str()
1477 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1478
1479 // Extract payload
1480 let payload = &event["payload"];
1481 if payload.is_null() {
1482 warn!("Missing 'payload' field in control event, skipping");
1483 continue;
1484 }
1485
1486 // Get data based on operation
1487 let control_data = match op {
1488 "i" | "u" => &payload["after"],
1489 "d" => &payload["before"],
1490 _ => {
1491 warn!("Unknown operation type in control event: {op}, skipping");
1492 continue;
1493 }
1494 };
1495
1496 if control_data.is_null() {
1497 warn!("Missing control data (after/before) for operation {op}, skipping");
1498 continue;
1499 }
1500
1501 // Extract required fields for SourceSubscription
1502 let query_id = match control_data["queryId"].as_str() {
1503 Some(id) => id.to_string(),
1504 None => {
1505 warn!("Missing required 'queryId' field in control event, skipping");
1506 continue;
1507 }
1508 };
1509
1510 let query_node_id = match control_data["queryNodeId"].as_str() {
1511 Some(id) => id.to_string(),
1512 None => {
1513 warn!("Missing required 'queryNodeId' field in control event, skipping");
1514 continue;
1515 }
1516 };
1517
1518 // Extract optional label arrays (default to empty if missing)
1519 let node_labels = control_data["nodeLabels"]
1520 .as_array()
1521 .map(|arr| {
1522 arr.iter()
1523 .filter_map(|v| v.as_str().map(String::from))
1524 .collect()
1525 })
1526 .unwrap_or_default();
1527
1528 let rel_labels = control_data["relLabels"]
1529 .as_array()
1530 .map(|arr| {
1531 arr.iter()
1532 .filter_map(|v| v.as_str().map(String::from))
1533 .collect()
1534 })
1535 .unwrap_or_default();
1536
1537 // Map operation to ControlOperation
1538 let operation = match op {
1539 "i" => ControlOperation::Insert,
1540 "u" => ControlOperation::Update,
1541 "d" => ControlOperation::Delete,
1542 _ => unreachable!(), // Already filtered above
1543 };
1544
1545 // Build SourceControl::Subscription
1546 let control_event = SourceControl::Subscription {
1547 query_id,
1548 query_node_id,
1549 node_labels,
1550 rel_labels,
1551 operation,
1552 };
1553
1554 control_events.push(control_event);
1555 }
1556
1557 Ok(control_events)
1558}