drasi_source_platform/lib.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unexpected_cfgs)]
16
17//! Platform Source Plugin for Drasi
18//!
19//! This plugin consumes data change events from Redis Streams, which is the primary
20//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
21//! containing node and relation changes, as well as control events for query subscriptions.
22//!
23//! # Architecture
24//!
25//! The platform source connects to Redis as a consumer group member, enabling:
26//! - **At-least-once delivery**: Messages are acknowledged after processing
27//! - **Horizontal scaling**: Multiple consumers can share the workload
28//! - **Fault tolerance**: Unacknowledged messages are redelivered
29//!
30//! # Configuration
31//!
32//! | Field | Type | Default | Description |
33//! |-------|------|---------|-------------|
34//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
35//! | `stream_key` | string | *required* | Redis stream key to consume from |
36//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
37//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
38//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
39//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
40//!
41//! # Data Format
42//!
43//! The platform source expects CloudEvent-wrapped messages with a `data` array
44//! containing change events. Each event includes an operation type and payload.
45//!
46//! ## Node Insert
47//!
48//! ```json
49//! {
50//! "data": [{
51//! "op": "i",
52//! "payload": {
53//! "after": {
54//! "id": "user-123",
55//! "labels": ["User"],
56//! "properties": {
57//! "name": "Alice",
58//! "email": "alice@example.com"
59//! }
60//! },
61//! "source": {
62//! "db": "mydb",
63//! "table": "node",
64//! "ts_ns": 1699900000000000000
65//! }
66//! }
67//! }]
68//! }
69//! ```
70//!
71//! ## Node Update
72//!
73//! ```json
74//! {
75//! "data": [{
76//! "op": "u",
77//! "payload": {
78//! "after": {
79//! "id": "user-123",
80//! "labels": ["User"],
81//! "properties": { "name": "Alice Updated" }
82//! },
83//! "source": { "table": "node", "ts_ns": 1699900001000000000 }
84//! }
85//! }]
86//! }
87//! ```
88//!
89//! ## Node Delete
90//!
91//! ```json
92//! {
93//! "data": [{
94//! "op": "d",
95//! "payload": {
96//! "before": {
97//! "id": "user-123",
98//! "labels": ["User"],
99//! "properties": {}
100//! },
101//! "source": { "table": "node", "ts_ns": 1699900002000000000 }
102//! }
103//! }]
104//! }
105//! ```
106//!
107//! ## Relation Insert
108//!
109//! ```json
110//! {
111//! "data": [{
112//! "op": "i",
113//! "payload": {
114//! "after": {
115//! "id": "follows-1",
116//! "labels": ["FOLLOWS"],
117//! "startId": "user-123",
118//! "endId": "user-456",
119//! "properties": { "since": "2024-01-01" }
120//! },
121//! "source": { "table": "rel", "ts_ns": 1699900003000000000 }
122//! }
123//! }]
124//! }
125//! ```
126//!
127//! # Control Events
128//!
129//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
130//! Currently supported control types:
131//!
132//! - **SourceSubscription**: Query subscription management
133//!
134//! # Example Configuration (YAML)
135//!
136//! ```yaml
137//! source_type: platform
138//! properties:
139//! redis_url: "redis://localhost:6379"
140//! stream_key: "my-app-changes"
141//! consumer_group: "drasi-consumers"
142//! batch_size: 50
143//! block_ms: 10000
144//! ```
145//!
146//! # Usage Example
147//!
148//! ```rust,ignore
149//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
150//! use std::sync::Arc;
151//!
152//! let config = PlatformSourceBuilder::new()
153//! .with_redis_url("redis://localhost:6379")
154//! .with_stream_key("my-changes")
155//! .with_consumer_group("my-consumers")
156//! .build();
157//!
158//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
159//! drasi.add_source(source).await?;
160//! ```
161
162pub mod config;
163pub mod descriptor;
164pub use config::PlatformSourceConfig;
165
166use anyhow::Result;
167use log::{debug, error, info, warn};
168use redis::streams::StreamReadReply;
169use serde_json::Value;
170use std::collections::HashMap;
171use std::sync::Arc;
172use std::time::Duration;
173use tokio::sync::RwLock;
174use tokio::task::JoinHandle;
175
176use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
177use drasi_lib::channels::{
178 ComponentStatus, ControlOperation, DispatchMode, SourceControl, SourceEvent,
179 SourceEventWrapper, SubscriptionResponse,
180};
181use drasi_lib::component_graph::ComponentStatusHandle;
182use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
183use drasi_lib::sources::manager::convert_json_to_element_properties;
184use drasi_lib::Source;
185use tracing::Instrument;
186
187#[cfg(test)]
188mod tests;
189
190/// Configuration for the platform source
191#[derive(Debug, Clone)]
192struct PlatformConfig {
193 /// Redis connection URL
194 redis_url: String,
195 /// Redis stream key to read from
196 stream_key: String,
197 /// Consumer group name
198 consumer_group: String,
199 /// Consumer name (should be unique per instance)
200 consumer_name: String,
201 /// Number of events to read per XREADGROUP call
202 batch_size: usize,
203 /// Milliseconds to block waiting for events
204 block_ms: u64,
205 /// Stream position to start from (">" for new, "0" for all)
206 start_id: String,
207 /// Always recreate consumer group on startup (default: false)
208 /// If true, deletes and recreates the consumer group using start_id
209 /// If false, uses existing group position (ignores start_id if group exists)
210 always_create_consumer_group: bool,
211 /// Maximum connection retry attempts
212 max_retries: usize,
213 /// Delay between retries in milliseconds
214 retry_delay_ms: u64,
215}
216
217impl Default for PlatformConfig {
218 fn default() -> Self {
219 Self {
220 redis_url: String::new(),
221 stream_key: String::new(),
222 consumer_group: String::new(),
223 consumer_name: String::new(),
224 batch_size: 10,
225 block_ms: 5000,
226 start_id: ">".to_string(),
227 always_create_consumer_group: false,
228 max_retries: 3,
229 retry_delay_ms: 1000,
230 }
231 }
232}
233
234/// Platform source that reads events from Redis Streams.
235///
236/// This source connects to a Redis instance and consumes CloudEvent-wrapped
237/// messages from a stream using consumer groups. It supports both data events
238/// (node/relation changes) and control events (query subscriptions).
239///
240/// # Fields
241///
242/// - `base`: Common source functionality (dispatchers, status, lifecycle)
243/// - `config`: Platform-specific configuration (Redis connection, stream settings)
244pub struct PlatformSource {
245 /// Base source implementation providing common functionality
246 base: SourceBase,
247 /// Platform source configuration
248 config: PlatformSourceConfig,
249}
250
251/// Builder for creating [`PlatformSource`] instances.
252///
253/// Provides a fluent API for constructing platform sources
254/// with sensible defaults.
255///
256/// # Example
257///
258/// ```rust,ignore
259/// use drasi_source_platform::PlatformSource;
260///
261/// let source = PlatformSource::builder("my-platform-source")
262/// .with_redis_url("redis://localhost:6379")
263/// .with_stream_key("my-app-changes")
264/// .with_consumer_group("my-consumers")
265/// .with_batch_size(50)
266/// .build()?;
267/// ```
268pub struct PlatformSourceBuilder {
269 id: String,
270 redis_url: String,
271 stream_key: String,
272 consumer_group: Option<String>,
273 consumer_name: Option<String>,
274 batch_size: Option<usize>,
275 block_ms: Option<u64>,
276 dispatch_mode: Option<DispatchMode>,
277 dispatch_buffer_capacity: Option<usize>,
278 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
279 auto_start: bool,
280}
281
282impl PlatformSourceBuilder {
283 /// Create a new builder with the given ID and default values.
284 pub fn new(id: impl Into<String>) -> Self {
285 Self {
286 id: id.into(),
287 redis_url: String::new(),
288 stream_key: String::new(),
289 consumer_group: None,
290 consumer_name: None,
291 batch_size: None,
292 block_ms: None,
293 dispatch_mode: None,
294 dispatch_buffer_capacity: None,
295 bootstrap_provider: None,
296 auto_start: true,
297 }
298 }
299
300 /// Set the Redis connection URL.
301 ///
302 /// # Arguments
303 ///
304 /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
305 pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
306 self.redis_url = url.into();
307 self
308 }
309
310 /// Set the Redis stream key to consume from.
311 ///
312 /// # Arguments
313 ///
314 /// * `key` - Name of the Redis stream
315 pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
316 self.stream_key = key.into();
317 self
318 }
319
320 /// Set the consumer group name.
321 ///
322 /// # Arguments
323 ///
324 /// * `group` - Consumer group name (default: `"drasi-core"`)
325 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
326 self.consumer_group = Some(group.into());
327 self
328 }
329
330 /// Set a unique consumer name within the group.
331 ///
332 /// # Arguments
333 ///
334 /// * `name` - Unique consumer name (auto-generated if not specified)
335 pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
336 self.consumer_name = Some(name.into());
337 self
338 }
339
340 /// Set the batch size for reading events.
341 ///
342 /// # Arguments
343 ///
344 /// * `size` - Number of events to read per XREADGROUP call (default: 100)
345 pub fn with_batch_size(mut self, size: usize) -> Self {
346 self.batch_size = Some(size);
347 self
348 }
349
350 /// Set the block timeout for waiting on events.
351 ///
352 /// # Arguments
353 ///
354 /// * `ms` - Milliseconds to block waiting for events (default: 5000)
355 pub fn with_block_ms(mut self, ms: u64) -> Self {
356 self.block_ms = Some(ms);
357 self
358 }
359
360 /// Set the dispatch mode for this source
361 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
362 self.dispatch_mode = Some(mode);
363 self
364 }
365
366 /// Set the dispatch buffer capacity for this source
367 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
368 self.dispatch_buffer_capacity = Some(capacity);
369 self
370 }
371
372 /// Set the bootstrap provider for this source
373 pub fn with_bootstrap_provider(
374 mut self,
375 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
376 ) -> Self {
377 self.bootstrap_provider = Some(Box::new(provider));
378 self
379 }
380
381 /// Set whether this source should auto-start when DrasiLib starts.
382 ///
383 /// Default is `true`. Set to `false` if this source should only be
384 /// started manually via `start_source()`.
385 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
386 self.auto_start = auto_start;
387 self
388 }
389
390 /// Set the full configuration at once
391 pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
392 self.redis_url = config.redis_url;
393 self.stream_key = config.stream_key;
394 self.consumer_group = Some(config.consumer_group);
395 self.consumer_name = config.consumer_name;
396 self.batch_size = Some(config.batch_size);
397 self.block_ms = Some(config.block_ms);
398 self
399 }
400
401 /// Build the platform source.
402 ///
403 /// # Errors
404 ///
405 /// Returns an error if the source cannot be constructed.
406 pub fn build(self) -> Result<PlatformSource> {
407 let config = PlatformSourceConfig {
408 redis_url: self.redis_url,
409 stream_key: self.stream_key,
410 consumer_group: self
411 .consumer_group
412 .unwrap_or_else(|| "drasi-core".to_string()),
413 consumer_name: self.consumer_name,
414 batch_size: self.batch_size.unwrap_or(100),
415 block_ms: self.block_ms.unwrap_or(5000),
416 };
417
418 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
419 if let Some(mode) = self.dispatch_mode {
420 params = params.with_dispatch_mode(mode);
421 }
422 if let Some(capacity) = self.dispatch_buffer_capacity {
423 params = params.with_dispatch_buffer_capacity(capacity);
424 }
425 if let Some(provider) = self.bootstrap_provider {
426 params = params.with_bootstrap_provider(provider);
427 }
428
429 Ok(PlatformSource {
430 base: SourceBase::new(params)?,
431 config,
432 })
433 }
434}
435
436impl PlatformSource {
437 /// Create a builder for PlatformSource
438 ///
439 /// # Example
440 ///
441 /// ```rust,ignore
442 /// use drasi_source_platform::PlatformSource;
443 ///
444 /// let source = PlatformSource::builder("my-platform-source")
445 /// .with_redis_url("redis://localhost:6379")
446 /// .with_stream_key("my-changes")
447 /// .with_bootstrap_provider(my_provider)
448 /// .build()?;
449 /// ```
450 pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
451 PlatformSourceBuilder::new(id)
452 }
453
454 /// Create a new platform source.
455 ///
456 /// The event channel is automatically injected when the source is added
457 /// to DrasiLib via `add_source()`.
458 ///
459 /// # Arguments
460 ///
461 /// * `id` - Unique identifier for this source instance
462 /// * `config` - Platform source configuration
463 ///
464 /// # Returns
465 ///
466 /// A new `PlatformSource` instance, or an error if construction fails.
467 ///
468 /// # Errors
469 ///
470 /// Returns an error if the base source cannot be initialized.
471 ///
472 /// # Example
473 ///
474 /// ```rust,ignore
475 /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
476 ///
477 /// let config = PlatformSourceBuilder::new("my-platform-source")
478 /// .with_redis_url("redis://localhost:6379")
479 /// .with_stream_key("my-changes")
480 /// .build()?;
481 /// ```
482 pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
483 let id = id.into();
484 let params = SourceBaseParams::new(id);
485 Ok(Self {
486 base: SourceBase::new(params)?,
487 config,
488 })
489 }
490
491 /// Subscribe to source events (for testing)
492 ///
493 /// This method is intended for use in tests to receive events broadcast by the source.
494 /// In production, queries subscribe to sources through the SourceManager.
495 /// Parse configuration from properties
496 #[allow(dead_code)]
497 fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
498 // Extract required fields first
499 let redis_url = properties
500 .get("redis_url")
501 .and_then(|v| v.as_str())
502 .ok_or_else(|| {
503 anyhow::anyhow!(
504 "Configuration error: Missing required field 'redis_url'. \
505 Platform source requires a Redis connection URL"
506 )
507 })?
508 .to_string();
509
510 let stream_key = properties
511 .get("stream_key")
512 .and_then(|v| v.as_str())
513 .ok_or_else(|| {
514 anyhow::anyhow!(
515 "Configuration error: Missing required field 'stream_key'. \
516 Platform source requires a Redis Stream key to read from"
517 )
518 })?
519 .to_string();
520
521 let consumer_group = properties
522 .get("consumer_group")
523 .and_then(|v| v.as_str())
524 .ok_or_else(|| {
525 anyhow::anyhow!(
526 "Configuration error: Missing required field 'consumer_group'. \
527 Platform source requires a consumer group name"
528 )
529 })?
530 .to_string();
531
532 let consumer_name = properties
533 .get("consumer_name")
534 .and_then(|v| v.as_str())
535 .ok_or_else(|| {
536 anyhow::anyhow!(
537 "Configuration error: Missing required field 'consumer_name'. \
538 Platform source requires a unique consumer name"
539 )
540 })?
541 .to_string();
542
543 // Get defaults for optional field handling
544 let defaults = PlatformConfig::default();
545
546 // Build config with optional fields
547 let config = PlatformConfig {
548 redis_url,
549 stream_key,
550 consumer_group,
551 consumer_name,
552 batch_size: properties
553 .get("batch_size")
554 .and_then(|v| v.as_u64())
555 .map(|v| v as usize)
556 .unwrap_or(defaults.batch_size),
557 block_ms: properties
558 .get("block_ms")
559 .and_then(|v| v.as_u64())
560 .unwrap_or(defaults.block_ms),
561 start_id: properties
562 .get("start_id")
563 .and_then(|v| v.as_str())
564 .map(|s| s.to_string())
565 .unwrap_or(defaults.start_id),
566 always_create_consumer_group: properties
567 .get("always_create_consumer_group")
568 .and_then(|v| v.as_bool())
569 .unwrap_or(defaults.always_create_consumer_group),
570 max_retries: properties
571 .get("max_retries")
572 .and_then(|v| v.as_u64())
573 .map(|v| v as usize)
574 .unwrap_or(defaults.max_retries),
575 retry_delay_ms: properties
576 .get("retry_delay_ms")
577 .and_then(|v| v.as_u64())
578 .unwrap_or(defaults.retry_delay_ms),
579 };
580
581 // Validate
582 if config.redis_url.is_empty() {
583 return Err(anyhow::anyhow!(
584 "Validation error: redis_url cannot be empty. \
585 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
586 ));
587 }
588 if config.stream_key.is_empty() {
589 return Err(anyhow::anyhow!(
590 "Validation error: stream_key cannot be empty. \
591 Please specify the Redis Stream key to read from"
592 ));
593 }
594 if config.consumer_group.is_empty() {
595 return Err(anyhow::anyhow!(
596 "Validation error: consumer_group cannot be empty. \
597 Please specify a consumer group name for this source"
598 ));
599 }
600 if config.consumer_name.is_empty() {
601 return Err(anyhow::anyhow!(
602 "Validation error: consumer_name cannot be empty. \
603 Please specify a unique consumer name within the consumer group"
604 ));
605 }
606
607 Ok(config)
608 }
609
610 /// Connect to Redis with retry logic
611 async fn connect_with_retry(
612 redis_url: &str,
613 max_retries: usize,
614 retry_delay_ms: u64,
615 ) -> Result<redis::aio::MultiplexedConnection> {
616 let client = redis::Client::open(redis_url)?;
617 let mut delay = retry_delay_ms;
618
619 for attempt in 0..max_retries {
620 match client.get_multiplexed_async_connection().await {
621 Ok(conn) => {
622 info!("Successfully connected to Redis");
623 return Ok(conn);
624 }
625 Err(e) if attempt < max_retries - 1 => {
626 warn!(
627 "Redis connection failed (attempt {}/{}): {}",
628 attempt + 1,
629 max_retries,
630 e
631 );
632 tokio::time::sleep(Duration::from_millis(delay)).await;
633 delay *= 2; // Exponential backoff
634 }
635 Err(e) => {
636 return Err(anyhow::anyhow!(
637 "Failed to connect to Redis after {max_retries} attempts: {e}"
638 ));
639 }
640 }
641 }
642
643 unreachable!()
644 }
645
646 /// Create or recreate consumer group based on configuration
647 async fn create_consumer_group(
648 conn: &mut redis::aio::MultiplexedConnection,
649 stream_key: &str,
650 consumer_group: &str,
651 start_id: &str,
652 always_create: bool,
653 ) -> Result<()> {
654 // Determine the initial position for the consumer group
655 let group_start_id = if start_id == ">" {
656 "$" // ">" means only new messages, so create group at end
657 } else {
658 start_id // "0" or specific ID
659 };
660
661 // If always_create is true, delete the existing group first
662 if always_create {
663 info!(
664 "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
665 );
666
667 let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
668 .arg("DESTROY")
669 .arg(stream_key)
670 .arg(consumer_group)
671 .query_async(conn)
672 .await;
673
674 match destroy_result {
675 Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
676 Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
677 Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
678 Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
679 }
680 }
681
682 // Try to create the consumer group
683 let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
684 .arg("CREATE")
685 .arg(stream_key)
686 .arg(consumer_group)
687 .arg(group_start_id)
688 .arg("MKSTREAM")
689 .query_async(conn)
690 .await;
691
692 match result {
693 Ok(_) => {
694 info!(
695 "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
696 );
697 Ok(())
698 }
699 Err(e) => {
700 // BUSYGROUP error means the group already exists
701 if e.to_string().contains("BUSYGROUP") {
702 info!(
703 "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
704 );
705 Ok(())
706 } else {
707 Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
708 }
709 }
710 }
711 }
712
713 /// Start the stream consumer task
714 async fn start_consumer_task(
715 source_id: String,
716 instance_id: String,
717 platform_config: PlatformConfig,
718 dispatchers: Arc<
719 RwLock<
720 Vec<
721 Box<
722 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
723 >,
724 >,
725 >,
726 >,
727 reporter: ComponentStatusHandle,
728 ) -> JoinHandle<()> {
729 let source_id_for_span = source_id.clone();
730 let span = tracing::info_span!(
731 "platform_source_consumer",
732 instance_id = %instance_id,
733 component_id = %source_id_for_span,
734 component_type = "source"
735 );
736 tokio::spawn(async move {
737 info!(
738 "Starting platform source consumer for source '{}' on stream '{}'",
739 source_id, platform_config.stream_key
740 );
741
742 // Connect to Redis
743 let mut conn = match Self::connect_with_retry(
744 &platform_config.redis_url,
745 platform_config.max_retries,
746 platform_config.retry_delay_ms,
747 )
748 .await
749 {
750 Ok(conn) => conn,
751 Err(e) => {
752 error!("Failed to connect to Redis: {e}");
753 reporter.set_status(
754 ComponentStatus::Stopped,
755 Some(format!("Failed to connect to Redis: {e}")),
756 ).await;
757 return;
758 }
759 };
760
761 // Create consumer group
762 if let Err(e) = Self::create_consumer_group(
763 &mut conn,
764 &platform_config.stream_key,
765 &platform_config.consumer_group,
766 &platform_config.start_id,
767 platform_config.always_create_consumer_group,
768 )
769 .await
770 {
771 error!("Failed to create consumer group: {e}");
772 reporter.set_status(
773 ComponentStatus::Stopped,
774 Some(format!("Failed to create consumer group: {e}")),
775 ).await;
776 return;
777 }
778
779 // Main consumer loop
780 loop {
781 // Read from stream using ">" to get next undelivered messages for this consumer group
782 let read_result: Result<StreamReadReply, redis::RedisError> =
783 redis::cmd("XREADGROUP")
784 .arg("GROUP")
785 .arg(&platform_config.consumer_group)
786 .arg(&platform_config.consumer_name)
787 .arg("COUNT")
788 .arg(platform_config.batch_size)
789 .arg("BLOCK")
790 .arg(platform_config.block_ms)
791 .arg("STREAMS")
792 .arg(&platform_config.stream_key)
793 .arg(">") // Always use ">" for consumer group reads
794 .query_async(&mut conn)
795 .await;
796
797 match read_result {
798 Ok(reply) => {
799 // Collect all stream IDs for batch acknowledgment
800 let mut all_stream_ids = Vec::new();
801
802 // Process each stream entry
803 for stream_key in reply.keys {
804 for stream_id in stream_key.ids {
805 debug!("Received event from stream: {}", stream_id.id);
806
807 // Store stream ID for batch acknowledgment
808 all_stream_ids.push(stream_id.id.clone());
809
810 // Extract event data
811 match extract_event_data(&stream_id.map) {
812 Ok(event_json) => {
813 // Parse JSON
814 match serde_json::from_str::<Value>(&event_json) {
815 Ok(cloud_event) => {
816 // Detect message type
817 let message_type =
818 detect_message_type(&cloud_event);
819
820 match message_type {
821 MessageType::Control(control_type) => {
822 // Handle control message
823 debug!(
824 "Detected control message of type: {control_type}"
825 );
826
827 match transform_control_event(
828 cloud_event,
829 &control_type,
830 ) {
831 Ok(control_events) => {
832 // Publish control events
833 for control_event in control_events
834 {
835 // Create profiling metadata with timestamps
836 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
837 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
838
839 let wrapper = SourceEventWrapper::with_profiling(
840 source_id.clone(),
841 SourceEvent::Control(control_event),
842 chrono::Utc::now(),
843 profiling,
844 );
845
846 // Dispatch via helper
847 if let Err(e) = SourceBase::dispatch_from_task(
848 dispatchers.clone(),
849 wrapper,
850 &source_id,
851 )
852 .await
853 {
854 debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
855 } else {
856 debug!(
857 "Published control event for stream {}",
858 stream_id.id
859 );
860 }
861 }
862 }
863 Err(e) => {
864 warn!(
865 "Failed to transform control event {}: {}",
866 stream_id.id, e
867 );
868 }
869 }
870 }
871 MessageType::Data => {
872 // Handle data message
873 match transform_platform_event(
874 cloud_event,
875 &source_id,
876 ) {
877 Ok(source_changes_with_timestamps) => {
878 // Publish source changes
879 for item in
880 source_changes_with_timestamps
881 {
882 // Create profiling metadata with timestamps
883 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
884 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
885
886 // Extract source_ns from SourceChange transaction time
887 profiling.source_ns = Some(
888 item.source_change
889 .get_transaction_time(),
890 );
891
892 // Set reactivator timestamps from event
893 profiling
894 .reactivator_start_ns =
895 item.reactivator_start_ns;
896 profiling.reactivator_end_ns =
897 item.reactivator_end_ns;
898
899 let wrapper = SourceEventWrapper::with_profiling(
900 source_id.clone(),
901 SourceEvent::Change(item.source_change),
902 chrono::Utc::now(),
903 profiling,
904 );
905
906 // Dispatch via helper
907 if let Err(e) = SourceBase::dispatch_from_task(
908 dispatchers.clone(),
909 wrapper,
910 &source_id,
911 )
912 .await
913 {
914 debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
915 } else {
916 debug!(
917 "Published source change for event {}",
918 stream_id.id
919 );
920 }
921 }
922 }
923 Err(e) => {
924 warn!(
925 "Failed to transform event {}: {}",
926 stream_id.id, e
927 );
928 reporter.set_status(
929 ComponentStatus::Running,
930 Some(format!(
931 "Transformation error: {e}"
932 )),
933 ).await;
934 }
935 }
936 }
937 }
938 }
939 Err(e) => {
940 warn!(
941 "Failed to parse JSON for event {}: {}",
942 stream_id.id, e
943 );
944 }
945 }
946 }
947 Err(e) => {
948 warn!(
949 "Failed to extract event data from {}: {}",
950 stream_id.id, e
951 );
952 }
953 }
954 }
955 }
956
957 // Batch acknowledge all messages at once
958 if !all_stream_ids.is_empty() {
959 debug!("Acknowledging batch of {} messages", all_stream_ids.len());
960
961 let mut cmd = redis::cmd("XACK");
962 cmd.arg(&platform_config.stream_key)
963 .arg(&platform_config.consumer_group);
964
965 // Add all stream IDs to the command
966 for stream_id in &all_stream_ids {
967 cmd.arg(stream_id);
968 }
969
970 match cmd.query_async::<_, i64>(&mut conn).await {
971 Ok(ack_count) => {
972 debug!("Successfully acknowledged {ack_count} messages");
973 if ack_count as usize != all_stream_ids.len() {
974 warn!(
975 "Acknowledged {} messages but expected {}",
976 ack_count,
977 all_stream_ids.len()
978 );
979 }
980 }
981 Err(e) => {
982 error!("Failed to acknowledge message batch: {e}");
983
984 // Fallback: Try acknowledging messages individually
985 warn!("Falling back to individual acknowledgments");
986 for stream_id in &all_stream_ids {
987 match redis::cmd("XACK")
988 .arg(&platform_config.stream_key)
989 .arg(&platform_config.consumer_group)
990 .arg(stream_id)
991 .query_async::<_, i64>(&mut conn)
992 .await
993 {
994 Ok(_) => {
995 debug!(
996 "Individually acknowledged message {stream_id}"
997 );
998 }
999 Err(e) => {
1000 error!(
1001 "Failed to individually acknowledge message {stream_id}: {e}"
1002 );
1003 }
1004 }
1005 }
1006 }
1007 }
1008 }
1009 }
1010 Err(e) => {
1011 // Check if it's a connection error
1012 if is_connection_error(&e) {
1013 error!("Redis connection lost: {e}");
1014 reporter.set_status(
1015 ComponentStatus::Running,
1016 Some(format!("Redis connection lost: {e}")),
1017 ).await;
1018
1019 // Try to reconnect
1020 match Self::connect_with_retry(
1021 &platform_config.redis_url,
1022 platform_config.max_retries,
1023 platform_config.retry_delay_ms,
1024 )
1025 .await
1026 {
1027 Ok(new_conn) => {
1028 conn = new_conn;
1029 info!("Reconnected to Redis");
1030 }
1031 Err(e) => {
1032 error!("Failed to reconnect to Redis: {e}");
1033 reporter.set_status(ComponentStatus::Stopped, None).await;
1034 return;
1035 }
1036 }
1037 } else if !e.to_string().contains("timeout") {
1038 // Log non-timeout errors
1039 error!("Error reading from stream: {e}");
1040 }
1041 }
1042 }
1043 }
1044 }.instrument(span))
1045 }
1046}
1047
1048#[async_trait::async_trait]
1049impl Source for PlatformSource {
1050 fn id(&self) -> &str {
1051 &self.base.id
1052 }
1053
1054 fn type_name(&self) -> &str {
1055 "platform"
1056 }
1057
1058 fn properties(&self) -> HashMap<String, serde_json::Value> {
1059 use crate::descriptor::PlatformSourceConfigDto;
1060 use drasi_plugin_sdk::ConfigValue;
1061
1062 let dto = PlatformSourceConfigDto {
1063 redis_url: ConfigValue::Static(self.config.redis_url.clone()),
1064 stream_key: ConfigValue::Static(self.config.stream_key.clone()),
1065 consumer_group: ConfigValue::Static(self.config.consumer_group.clone()),
1066 consumer_name: self
1067 .config
1068 .consumer_name
1069 .as_ref()
1070 .map(|n| ConfigValue::Static(n.clone())),
1071 batch_size: ConfigValue::Static(self.config.batch_size),
1072 block_ms: ConfigValue::Static(self.config.block_ms),
1073 };
1074
1075 match serde_json::to_value(&dto) {
1076 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
1077 _ => HashMap::new(),
1078 }
1079 }
1080
1081 fn auto_start(&self) -> bool {
1082 self.base.get_auto_start()
1083 }
1084
1085 async fn start(&self) -> Result<()> {
1086 info!("Starting platform source: {}", self.base.id);
1087
1088 // Extract configuration from typed config
1089 let platform_config = PlatformConfig {
1090 redis_url: self.config.redis_url.clone(),
1091 stream_key: self.config.stream_key.clone(),
1092 consumer_group: self.config.consumer_group.clone(),
1093 consumer_name: self
1094 .config
1095 .consumer_name
1096 .clone()
1097 .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1098 batch_size: self.config.batch_size,
1099 block_ms: self.config.block_ms,
1100 start_id: ">".to_string(),
1101 always_create_consumer_group: false,
1102 max_retries: 5,
1103 retry_delay_ms: 1000,
1104 };
1105
1106 // Update status
1107 self.base
1108 .set_status(
1109 ComponentStatus::Running,
1110 Some("Platform source running".to_string()),
1111 )
1112 .await;
1113
1114 // Get instance_id from context for log routing isolation
1115 let instance_id = self
1116 .base
1117 .context()
1118 .await
1119 .map(|c| c.instance_id)
1120 .unwrap_or_default();
1121
1122 // Start consumer task
1123 let task = Self::start_consumer_task(
1124 self.base.id.clone(),
1125 instance_id,
1126 platform_config,
1127 self.base.dispatchers.clone(),
1128 self.base.status_handle(),
1129 )
1130 .await;
1131
1132 *self.base.task_handle.write().await = Some(task);
1133
1134 Ok(())
1135 }
1136
1137 async fn stop(&self) -> Result<()> {
1138 info!("Stopping platform source: {}", self.base.id);
1139
1140 // Cancel the task
1141 if let Some(handle) = self.base.task_handle.write().await.take() {
1142 handle.abort();
1143 }
1144
1145 // Update status
1146 self.base
1147 .set_status(
1148 ComponentStatus::Stopped,
1149 Some("Platform source stopped".to_string()),
1150 )
1151 .await;
1152
1153 Ok(())
1154 }
1155
1156 async fn status(&self) -> ComponentStatus {
1157 self.base.get_status().await
1158 }
1159
1160 async fn subscribe(
1161 &self,
1162 settings: drasi_lib::config::SourceSubscriptionSettings,
1163 ) -> Result<SubscriptionResponse> {
1164 self.base
1165 .subscribe_with_bootstrap(&settings, "Platform")
1166 .await
1167 }
1168
1169 fn as_any(&self) -> &dyn std::any::Any {
1170 self
1171 }
1172
1173 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1174 self.base.initialize(context).await;
1175 }
1176
1177 async fn set_bootstrap_provider(
1178 &self,
1179 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1180 ) {
1181 self.base.set_bootstrap_provider(provider).await;
1182 }
1183}
1184
1185impl PlatformSource {
1186 /// Create a test subscription to this source (synchronous)
1187 ///
1188 /// This method delegates to SourceBase and is provided for convenience in tests.
1189 /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1190 pub fn test_subscribe(
1191 &self,
1192 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1193 self.base.test_subscribe()
1194 }
1195
1196 /// Create a test subscription to this source (async)
1197 ///
1198 /// This method delegates to SourceBase and is provided for convenience in async tests.
1199 /// Prefer this method over test_subscribe() in async contexts.
1200 pub async fn test_subscribe_async(
1201 &self,
1202 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1203 self.base
1204 .create_streaming_receiver()
1205 .await
1206 .expect("Failed to create test subscription")
1207 }
1208}
1209
1210/// Extract event data from Redis stream entry
1211fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1212 // Look for common field names
1213 for key in &["data", "event", "payload", "message"] {
1214 if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1215 return String::from_utf8(data.clone())
1216 .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1217 }
1218 }
1219
1220 Err(anyhow::anyhow!(
1221 "No event data found in stream entry. Available keys: {:?}",
1222 entry_map.keys().collect::<Vec<_>>()
1223 ))
1224}
1225
1226/// Check if error is a connection error
1227fn is_connection_error(e: &redis::RedisError) -> bool {
1228 e.is_connection_dropped()
1229 || e.is_io_error()
1230 || e.to_string().contains("connection")
1231 || e.to_string().contains("EOF")
1232}
1233
1234/// Message type discriminator
1235#[derive(Debug, Clone, PartialEq)]
1236enum MessageType {
1237 /// Control message with control type from source.table
1238 Control(String),
1239 /// Data message (node/relation change)
1240 Data,
1241}
1242
1243/// Detect message type based on payload.source.db field
1244///
1245/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1246/// Returns MessageType::Data for all other cases
1247fn detect_message_type(cloud_event: &Value) -> MessageType {
1248 // Extract data array and get first event to check message type
1249 let data_array = match cloud_event["data"].as_array() {
1250 Some(arr) if !arr.is_empty() => arr,
1251 _ => return MessageType::Data, // Default to data if no data array
1252 };
1253
1254 // Check the first event's source.db field
1255 let first_event = &data_array[0];
1256 let source_db = first_event["payload"]["source"]["db"]
1257 .as_str()
1258 .unwrap_or("");
1259
1260 // Case-insensitive comparison with "Drasi"
1261 if source_db.eq_ignore_ascii_case("drasi") {
1262 // Extract source.table to determine control type
1263 let control_type = first_event["payload"]["source"]["table"]
1264 .as_str()
1265 .unwrap_or("Unknown")
1266 .to_string();
1267 MessageType::Control(control_type)
1268 } else {
1269 MessageType::Data
1270 }
1271}
1272
1273/// Helper struct to hold SourceChange along with reactivator timestamps
1274#[derive(Debug)]
1275struct SourceChangeWithTimestamps {
1276 source_change: SourceChange,
1277 reactivator_start_ns: Option<u64>,
1278 reactivator_end_ns: Option<u64>,
1279}
1280
1281/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1282///
1283/// Handles events in CloudEvent format with a data array containing change events.
1284/// Each event in the data array has: op, payload.after/before, payload.source
1285fn transform_platform_event(
1286 cloud_event: Value,
1287 source_id: &str,
1288) -> Result<Vec<SourceChangeWithTimestamps>> {
1289 let mut source_changes = Vec::new();
1290
1291 // Extract the data array from CloudEvent wrapper
1292 let data_array = cloud_event["data"]
1293 .as_array()
1294 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1295
1296 // Process each event in the data array
1297 for event in data_array {
1298 // Extract reactivator timestamps from top-level event fields
1299 let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1300 let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1301
1302 // Extract operation type (op field: "i", "u", "d")
1303 let op = event["op"]
1304 .as_str()
1305 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1306
1307 // Extract payload
1308 let payload = &event["payload"];
1309 if payload.is_null() {
1310 return Err(anyhow::anyhow!("Missing 'payload' field"));
1311 }
1312
1313 // Extract element type from payload.source.table
1314 let element_type = payload["source"]["table"]
1315 .as_str()
1316 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1317
1318 // Get element data based on operation
1319 let element_data = match op {
1320 "i" | "u" => &payload["after"],
1321 "d" => &payload["before"],
1322 _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1323 };
1324
1325 if element_data.is_null() {
1326 return Err(anyhow::anyhow!(
1327 "Missing element data (after/before) for operation {op}"
1328 ));
1329 }
1330
1331 // Extract element ID
1332 let element_id = element_data["id"]
1333 .as_str()
1334 .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1335
1336 // Extract labels
1337 let labels_array = element_data["labels"]
1338 .as_array()
1339 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1340
1341 let labels: Vec<Arc<str>> = labels_array
1342 .iter()
1343 .filter_map(|v| v.as_str().map(Arc::from))
1344 .collect();
1345
1346 if labels.is_empty() {
1347 return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1348 }
1349
1350 // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1351 let ts_ns = payload["source"]["ts_ns"]
1352 .as_u64()
1353 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1354 let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1355
1356 // Build ElementMetadata
1357 let reference = ElementReference::new(source_id, element_id);
1358 let metadata = ElementMetadata {
1359 reference,
1360 labels: labels.into(),
1361 effective_from,
1362 };
1363
1364 // Handle delete operation (no properties needed)
1365 if op == "d" {
1366 source_changes.push(SourceChangeWithTimestamps {
1367 source_change: SourceChange::Delete { metadata },
1368 reactivator_start_ns,
1369 reactivator_end_ns,
1370 });
1371 continue;
1372 }
1373
1374 // Convert properties
1375 let properties_obj = element_data["properties"]
1376 .as_object()
1377 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1378
1379 let properties = convert_json_to_element_properties(properties_obj);
1380
1381 // Build element based on type
1382 let element = match element_type {
1383 "node" => Element::Node {
1384 metadata,
1385 properties,
1386 },
1387 "rel" | "relation" => {
1388 // Extract startId and endId
1389 let start_id = element_data["startId"]
1390 .as_str()
1391 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1392 let end_id = element_data["endId"]
1393 .as_str()
1394 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1395
1396 Element::Relation {
1397 metadata,
1398 properties,
1399 in_node: ElementReference::new(source_id, start_id),
1400 out_node: ElementReference::new(source_id, end_id),
1401 }
1402 }
1403 _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1404 };
1405
1406 // Build SourceChange
1407 let source_change = match op {
1408 "i" => SourceChange::Insert { element },
1409 "u" => SourceChange::Update { element },
1410 _ => unreachable!(),
1411 };
1412
1413 source_changes.push(SourceChangeWithTimestamps {
1414 source_change,
1415 reactivator_start_ns,
1416 reactivator_end_ns,
1417 });
1418 }
1419
1420 Ok(source_changes)
1421}
1422
1423/// Transform CloudEvent-wrapped control event to SourceControl(s)
1424///
1425/// Handles control messages from Query API service with source.db = "Drasi"
1426/// Currently supports "SourceSubscription" control type
1427fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1428 let mut control_events = Vec::new();
1429
1430 // Extract the data array from CloudEvent wrapper
1431 let data_array = cloud_event["data"]
1432 .as_array()
1433 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1434
1435 // Check if control type is supported
1436 if control_type != "SourceSubscription" {
1437 info!(
1438 "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1439 );
1440 return Ok(control_events); // Return empty vec for unknown types
1441 }
1442
1443 // Process each event in the data array
1444 for event in data_array {
1445 // Extract operation type (op field: "i", "u", "d")
1446 let op = event["op"]
1447 .as_str()
1448 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1449
1450 // Extract payload
1451 let payload = &event["payload"];
1452 if payload.is_null() {
1453 warn!("Missing 'payload' field in control event, skipping");
1454 continue;
1455 }
1456
1457 // Get data based on operation
1458 let control_data = match op {
1459 "i" | "u" => &payload["after"],
1460 "d" => &payload["before"],
1461 _ => {
1462 warn!("Unknown operation type in control event: {op}, skipping");
1463 continue;
1464 }
1465 };
1466
1467 if control_data.is_null() {
1468 warn!("Missing control data (after/before) for operation {op}, skipping");
1469 continue;
1470 }
1471
1472 // Extract required fields for SourceSubscription
1473 let query_id = match control_data["queryId"].as_str() {
1474 Some(id) => id.to_string(),
1475 None => {
1476 warn!("Missing required 'queryId' field in control event, skipping");
1477 continue;
1478 }
1479 };
1480
1481 let query_node_id = match control_data["queryNodeId"].as_str() {
1482 Some(id) => id.to_string(),
1483 None => {
1484 warn!("Missing required 'queryNodeId' field in control event, skipping");
1485 continue;
1486 }
1487 };
1488
1489 // Extract optional label arrays (default to empty if missing)
1490 let node_labels = control_data["nodeLabels"]
1491 .as_array()
1492 .map(|arr| {
1493 arr.iter()
1494 .filter_map(|v| v.as_str().map(String::from))
1495 .collect()
1496 })
1497 .unwrap_or_default();
1498
1499 let rel_labels = control_data["relLabels"]
1500 .as_array()
1501 .map(|arr| {
1502 arr.iter()
1503 .filter_map(|v| v.as_str().map(String::from))
1504 .collect()
1505 })
1506 .unwrap_or_default();
1507
1508 // Map operation to ControlOperation
1509 let operation = match op {
1510 "i" => ControlOperation::Insert,
1511 "u" => ControlOperation::Update,
1512 "d" => ControlOperation::Delete,
1513 _ => unreachable!(), // Already filtered above
1514 };
1515
1516 // Build SourceControl::Subscription
1517 let control_event = SourceControl::Subscription {
1518 query_id,
1519 query_node_id,
1520 node_labels,
1521 rel_labels,
1522 operation,
1523 };
1524
1525 control_events.push(control_event);
1526 }
1527
1528 Ok(control_events)
1529}
1530
1531/// Dynamic plugin entry point.
1532///
1533/// Dynamic plugin entry point.
1534#[cfg(feature = "dynamic-plugin")]
1535drasi_plugin_sdk::export_plugin!(
1536 plugin_id = "platform-source",
1537 core_version = env!("CARGO_PKG_VERSION"),
1538 lib_version = env!("CARGO_PKG_VERSION"),
1539 plugin_version = env!("CARGO_PKG_VERSION"),
1540 source_descriptors = [descriptor::PlatformSourceDescriptor],
1541 reaction_descriptors = [],
1542 bootstrap_descriptors = [],
1543);