drasi_source_platform/lib.rs
1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Platform Source Plugin for Drasi
17//!
18//! This plugin consumes data change events from Redis Streams, which is the primary
19//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
20//! containing node and relation changes, as well as control events for query subscriptions.
21//!
22//! # Architecture
23//!
24//! The platform source connects to Redis as a consumer group member, enabling:
25//! - **At-least-once delivery**: Messages are acknowledged after processing
26//! - **Horizontal scaling**: Multiple consumers can share the workload
27//! - **Fault tolerance**: Unacknowledged messages are redelivered
28//!
29//! # Configuration
30//!
31//! | Field | Type | Default | Description |
32//! |-------|------|---------|-------------|
33//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
34//! | `stream_key` | string | *required* | Redis stream key to consume from |
35//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
36//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
37//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
38//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
39//!
40//! # Data Format
41//!
42//! The platform source expects CloudEvent-wrapped messages with a `data` array
43//! containing change events. Each event includes an operation type and payload.
44//!
45//! ## Node Insert
46//!
47//! ```json
48//! {
49//! "data": [{
50//! "op": "i",
51//! "payload": {
52//! "after": {
53//! "id": "user-123",
54//! "labels": ["User"],
55//! "properties": {
56//! "name": "Alice",
57//! "email": "alice@example.com"
58//! }
59//! },
60//! "source": {
61//! "db": "mydb",
62//! "table": "node",
63//! "ts_ns": 1699900000000000000
64//! }
65//! }
66//! }]
67//! }
68//! ```
69//!
70//! ## Node Update
71//!
72//! ```json
73//! {
74//! "data": [{
75//! "op": "u",
76//! "payload": {
77//! "after": {
78//! "id": "user-123",
79//! "labels": ["User"],
80//! "properties": { "name": "Alice Updated" }
81//! },
82//! "source": { "table": "node", "ts_ns": 1699900001000000000 }
83//! }
84//! }]
85//! }
86//! ```
87//!
88//! ## Node Delete
89//!
90//! ```json
91//! {
92//! "data": [{
93//! "op": "d",
94//! "payload": {
95//! "before": {
96//! "id": "user-123",
97//! "labels": ["User"],
98//! "properties": {}
99//! },
100//! "source": { "table": "node", "ts_ns": 1699900002000000000 }
101//! }
102//! }]
103//! }
104//! ```
105//!
106//! ## Relation Insert
107//!
108//! ```json
109//! {
110//! "data": [{
111//! "op": "i",
112//! "payload": {
113//! "after": {
114//! "id": "follows-1",
115//! "labels": ["FOLLOWS"],
116//! "startId": "user-123",
117//! "endId": "user-456",
118//! "properties": { "since": "2024-01-01" }
119//! },
120//! "source": { "table": "rel", "ts_ns": 1699900003000000000 }
121//! }
122//! }]
123//! }
124//! ```
125//!
126//! # Control Events
127//!
128//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
129//! Currently supported control types:
130//!
131//! - **SourceSubscription**: Query subscription management
132//!
133//! # Example Configuration (YAML)
134//!
135//! ```yaml
136//! source_type: platform
137//! properties:
138//! redis_url: "redis://localhost:6379"
139//! stream_key: "my-app-changes"
140//! consumer_group: "drasi-consumers"
141//! batch_size: 50
142//! block_ms: 10000
143//! ```
144//!
145//! # Usage Example
146//!
147//! ```rust,ignore
148//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
149//! use std::sync::Arc;
150//!
151//! let config = PlatformSourceBuilder::new()
152//! .with_redis_url("redis://localhost:6379")
153//! .with_stream_key("my-changes")
154//! .with_consumer_group("my-consumers")
155//! .build();
156//!
157//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
158//! drasi.add_source(source).await?;
159//! ```
160
161pub mod config;
162pub mod descriptor;
163pub use config::PlatformSourceConfig;
164
165use anyhow::Result;
166use log::{debug, error, info, warn};
167use redis::streams::StreamReadReply;
168use serde_json::Value;
169use std::collections::HashMap;
170use std::sync::Arc;
171use std::time::Duration;
172use tokio::sync::RwLock;
173use tokio::task::JoinHandle;
174
175use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
176use drasi_lib::channels::{
177 ComponentStatus, ControlOperation, DispatchMode, SourceControl, SourceEvent,
178 SourceEventWrapper, SubscriptionResponse,
179};
180use drasi_lib::component_graph::ComponentStatusHandle;
181use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
182use drasi_lib::sources::manager::convert_json_to_element_properties;
183use drasi_lib::Source;
184use tracing::Instrument;
185
186#[cfg(test)]
187mod tests;
188
189/// Configuration for the platform source
190#[derive(Debug, Clone)]
191struct PlatformConfig {
192 /// Redis connection URL
193 redis_url: String,
194 /// Redis stream key to read from
195 stream_key: String,
196 /// Consumer group name
197 consumer_group: String,
198 /// Consumer name (should be unique per instance)
199 consumer_name: String,
200 /// Number of events to read per XREADGROUP call
201 batch_size: usize,
202 /// Milliseconds to block waiting for events
203 block_ms: u64,
204 /// Stream position to start from (">" for new, "0" for all)
205 start_id: String,
206 /// Always recreate consumer group on startup (default: false)
207 /// If true, deletes and recreates the consumer group using start_id
208 /// If false, uses existing group position (ignores start_id if group exists)
209 always_create_consumer_group: bool,
210 /// Maximum connection retry attempts
211 max_retries: usize,
212 /// Delay between retries in milliseconds
213 retry_delay_ms: u64,
214}
215
216impl Default for PlatformConfig {
217 fn default() -> Self {
218 Self {
219 redis_url: String::new(),
220 stream_key: String::new(),
221 consumer_group: String::new(),
222 consumer_name: String::new(),
223 batch_size: 10,
224 block_ms: 5000,
225 start_id: ">".to_string(),
226 always_create_consumer_group: false,
227 max_retries: 3,
228 retry_delay_ms: 1000,
229 }
230 }
231}
232
233/// Platform source that reads events from Redis Streams.
234///
235/// This source connects to a Redis instance and consumes CloudEvent-wrapped
236/// messages from a stream using consumer groups. It supports both data events
237/// (node/relation changes) and control events (query subscriptions).
238///
239/// # Fields
240///
241/// - `base`: Common source functionality (dispatchers, status, lifecycle)
242/// - `config`: Platform-specific configuration (Redis connection, stream settings)
243pub struct PlatformSource {
244 /// Base source implementation providing common functionality
245 base: SourceBase,
246 /// Platform source configuration
247 config: PlatformSourceConfig,
248}
249
250/// Builder for creating [`PlatformSource`] instances.
251///
252/// Provides a fluent API for constructing platform sources
253/// with sensible defaults.
254///
255/// # Example
256///
257/// ```rust,ignore
258/// use drasi_source_platform::PlatformSource;
259///
260/// let source = PlatformSource::builder("my-platform-source")
261/// .with_redis_url("redis://localhost:6379")
262/// .with_stream_key("my-app-changes")
263/// .with_consumer_group("my-consumers")
264/// .with_batch_size(50)
265/// .build()?;
266/// ```
267pub struct PlatformSourceBuilder {
268 id: String,
269 redis_url: String,
270 stream_key: String,
271 consumer_group: Option<String>,
272 consumer_name: Option<String>,
273 batch_size: Option<usize>,
274 block_ms: Option<u64>,
275 dispatch_mode: Option<DispatchMode>,
276 dispatch_buffer_capacity: Option<usize>,
277 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
278 auto_start: bool,
279}
280
281impl PlatformSourceBuilder {
282 /// Create a new builder with the given ID and default values.
283 pub fn new(id: impl Into<String>) -> Self {
284 Self {
285 id: id.into(),
286 redis_url: String::new(),
287 stream_key: String::new(),
288 consumer_group: None,
289 consumer_name: None,
290 batch_size: None,
291 block_ms: None,
292 dispatch_mode: None,
293 dispatch_buffer_capacity: None,
294 bootstrap_provider: None,
295 auto_start: true,
296 }
297 }
298
299 /// Set the Redis connection URL.
300 ///
301 /// # Arguments
302 ///
303 /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
304 pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
305 self.redis_url = url.into();
306 self
307 }
308
309 /// Set the Redis stream key to consume from.
310 ///
311 /// # Arguments
312 ///
313 /// * `key` - Name of the Redis stream
314 pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
315 self.stream_key = key.into();
316 self
317 }
318
319 /// Set the consumer group name.
320 ///
321 /// # Arguments
322 ///
323 /// * `group` - Consumer group name (default: `"drasi-core"`)
324 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
325 self.consumer_group = Some(group.into());
326 self
327 }
328
329 /// Set a unique consumer name within the group.
330 ///
331 /// # Arguments
332 ///
333 /// * `name` - Unique consumer name (auto-generated if not specified)
334 pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
335 self.consumer_name = Some(name.into());
336 self
337 }
338
339 /// Set the batch size for reading events.
340 ///
341 /// # Arguments
342 ///
343 /// * `size` - Number of events to read per XREADGROUP call (default: 100)
344 pub fn with_batch_size(mut self, size: usize) -> Self {
345 self.batch_size = Some(size);
346 self
347 }
348
349 /// Set the block timeout for waiting on events.
350 ///
351 /// # Arguments
352 ///
353 /// * `ms` - Milliseconds to block waiting for events (default: 5000)
354 pub fn with_block_ms(mut self, ms: u64) -> Self {
355 self.block_ms = Some(ms);
356 self
357 }
358
359 /// Set the dispatch mode for this source
360 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
361 self.dispatch_mode = Some(mode);
362 self
363 }
364
365 /// Set the dispatch buffer capacity for this source
366 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
367 self.dispatch_buffer_capacity = Some(capacity);
368 self
369 }
370
371 /// Set the bootstrap provider for this source
372 pub fn with_bootstrap_provider(
373 mut self,
374 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
375 ) -> Self {
376 self.bootstrap_provider = Some(Box::new(provider));
377 self
378 }
379
380 /// Set whether this source should auto-start when DrasiLib starts.
381 ///
382 /// Default is `true`. Set to `false` if this source should only be
383 /// started manually via `start_source()`.
384 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
385 self.auto_start = auto_start;
386 self
387 }
388
389 /// Set the full configuration at once
390 pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
391 self.redis_url = config.redis_url;
392 self.stream_key = config.stream_key;
393 self.consumer_group = Some(config.consumer_group);
394 self.consumer_name = config.consumer_name;
395 self.batch_size = Some(config.batch_size);
396 self.block_ms = Some(config.block_ms);
397 self
398 }
399
400 /// Build the platform source.
401 ///
402 /// # Errors
403 ///
404 /// Returns an error if the source cannot be constructed.
405 pub fn build(self) -> Result<PlatformSource> {
406 let config = PlatformSourceConfig {
407 redis_url: self.redis_url,
408 stream_key: self.stream_key,
409 consumer_group: self
410 .consumer_group
411 .unwrap_or_else(|| "drasi-core".to_string()),
412 consumer_name: self.consumer_name,
413 batch_size: self.batch_size.unwrap_or(100),
414 block_ms: self.block_ms.unwrap_or(5000),
415 };
416
417 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
418 if let Some(mode) = self.dispatch_mode {
419 params = params.with_dispatch_mode(mode);
420 }
421 if let Some(capacity) = self.dispatch_buffer_capacity {
422 params = params.with_dispatch_buffer_capacity(capacity);
423 }
424 if let Some(provider) = self.bootstrap_provider {
425 params = params.with_bootstrap_provider(provider);
426 }
427
428 Ok(PlatformSource {
429 base: SourceBase::new(params)?,
430 config,
431 })
432 }
433}
434
435impl PlatformSource {
436 /// Create a builder for PlatformSource
437 ///
438 /// # Example
439 ///
440 /// ```rust,ignore
441 /// use drasi_source_platform::PlatformSource;
442 ///
443 /// let source = PlatformSource::builder("my-platform-source")
444 /// .with_redis_url("redis://localhost:6379")
445 /// .with_stream_key("my-changes")
446 /// .with_bootstrap_provider(my_provider)
447 /// .build()?;
448 /// ```
449 pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
450 PlatformSourceBuilder::new(id)
451 }
452
453 /// Create a new platform source.
454 ///
455 /// The event channel is automatically injected when the source is added
456 /// to DrasiLib via `add_source()`.
457 ///
458 /// # Arguments
459 ///
460 /// * `id` - Unique identifier for this source instance
461 /// * `config` - Platform source configuration
462 ///
463 /// # Returns
464 ///
465 /// A new `PlatformSource` instance, or an error if construction fails.
466 ///
467 /// # Errors
468 ///
469 /// Returns an error if the base source cannot be initialized.
470 ///
471 /// # Example
472 ///
473 /// ```rust,ignore
474 /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
475 ///
476 /// let config = PlatformSourceBuilder::new("my-platform-source")
477 /// .with_redis_url("redis://localhost:6379")
478 /// .with_stream_key("my-changes")
479 /// .build()?;
480 /// ```
481 pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
482 let id = id.into();
483 let params = SourceBaseParams::new(id);
484 Ok(Self {
485 base: SourceBase::new(params)?,
486 config,
487 })
488 }
489
490 /// Subscribe to source events (for testing)
491 ///
492 /// This method is intended for use in tests to receive events broadcast by the source.
493 /// In production, queries subscribe to sources through the SourceManager.
494 /// Parse configuration from properties
495 #[allow(dead_code)]
496 fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
497 // Extract required fields first
498 let redis_url = properties
499 .get("redis_url")
500 .and_then(|v| v.as_str())
501 .ok_or_else(|| {
502 anyhow::anyhow!(
503 "Configuration error: Missing required field 'redis_url'. \
504 Platform source requires a Redis connection URL"
505 )
506 })?
507 .to_string();
508
509 let stream_key = properties
510 .get("stream_key")
511 .and_then(|v| v.as_str())
512 .ok_or_else(|| {
513 anyhow::anyhow!(
514 "Configuration error: Missing required field 'stream_key'. \
515 Platform source requires a Redis Stream key to read from"
516 )
517 })?
518 .to_string();
519
520 let consumer_group = properties
521 .get("consumer_group")
522 .and_then(|v| v.as_str())
523 .ok_or_else(|| {
524 anyhow::anyhow!(
525 "Configuration error: Missing required field 'consumer_group'. \
526 Platform source requires a consumer group name"
527 )
528 })?
529 .to_string();
530
531 let consumer_name = properties
532 .get("consumer_name")
533 .and_then(|v| v.as_str())
534 .ok_or_else(|| {
535 anyhow::anyhow!(
536 "Configuration error: Missing required field 'consumer_name'. \
537 Platform source requires a unique consumer name"
538 )
539 })?
540 .to_string();
541
542 // Get defaults for optional field handling
543 let defaults = PlatformConfig::default();
544
545 // Build config with optional fields
546 let config = PlatformConfig {
547 redis_url,
548 stream_key,
549 consumer_group,
550 consumer_name,
551 batch_size: properties
552 .get("batch_size")
553 .and_then(|v| v.as_u64())
554 .map(|v| v as usize)
555 .unwrap_or(defaults.batch_size),
556 block_ms: properties
557 .get("block_ms")
558 .and_then(|v| v.as_u64())
559 .unwrap_or(defaults.block_ms),
560 start_id: properties
561 .get("start_id")
562 .and_then(|v| v.as_str())
563 .map(|s| s.to_string())
564 .unwrap_or(defaults.start_id),
565 always_create_consumer_group: properties
566 .get("always_create_consumer_group")
567 .and_then(|v| v.as_bool())
568 .unwrap_or(defaults.always_create_consumer_group),
569 max_retries: properties
570 .get("max_retries")
571 .and_then(|v| v.as_u64())
572 .map(|v| v as usize)
573 .unwrap_or(defaults.max_retries),
574 retry_delay_ms: properties
575 .get("retry_delay_ms")
576 .and_then(|v| v.as_u64())
577 .unwrap_or(defaults.retry_delay_ms),
578 };
579
580 // Validate
581 if config.redis_url.is_empty() {
582 return Err(anyhow::anyhow!(
583 "Validation error: redis_url cannot be empty. \
584 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
585 ));
586 }
587 if config.stream_key.is_empty() {
588 return Err(anyhow::anyhow!(
589 "Validation error: stream_key cannot be empty. \
590 Please specify the Redis Stream key to read from"
591 ));
592 }
593 if config.consumer_group.is_empty() {
594 return Err(anyhow::anyhow!(
595 "Validation error: consumer_group cannot be empty. \
596 Please specify a consumer group name for this source"
597 ));
598 }
599 if config.consumer_name.is_empty() {
600 return Err(anyhow::anyhow!(
601 "Validation error: consumer_name cannot be empty. \
602 Please specify a unique consumer name within the consumer group"
603 ));
604 }
605
606 Ok(config)
607 }
608
609 /// Connect to Redis with retry logic
610 async fn connect_with_retry(
611 redis_url: &str,
612 max_retries: usize,
613 retry_delay_ms: u64,
614 ) -> Result<redis::aio::MultiplexedConnection> {
615 let client = redis::Client::open(redis_url)?;
616 let mut delay = retry_delay_ms;
617
618 for attempt in 0..max_retries {
619 match client.get_multiplexed_async_connection().await {
620 Ok(conn) => {
621 info!("Successfully connected to Redis");
622 return Ok(conn);
623 }
624 Err(e) if attempt < max_retries - 1 => {
625 warn!(
626 "Redis connection failed (attempt {}/{}): {}",
627 attempt + 1,
628 max_retries,
629 e
630 );
631 tokio::time::sleep(Duration::from_millis(delay)).await;
632 delay *= 2; // Exponential backoff
633 }
634 Err(e) => {
635 return Err(anyhow::anyhow!(
636 "Failed to connect to Redis after {max_retries} attempts: {e}"
637 ));
638 }
639 }
640 }
641
642 unreachable!()
643 }
644
645 /// Create or recreate consumer group based on configuration
646 async fn create_consumer_group(
647 conn: &mut redis::aio::MultiplexedConnection,
648 stream_key: &str,
649 consumer_group: &str,
650 start_id: &str,
651 always_create: bool,
652 ) -> Result<()> {
653 // Determine the initial position for the consumer group
654 let group_start_id = if start_id == ">" {
655 "$" // ">" means only new messages, so create group at end
656 } else {
657 start_id // "0" or specific ID
658 };
659
660 // If always_create is true, delete the existing group first
661 if always_create {
662 info!(
663 "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
664 );
665
666 let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
667 .arg("DESTROY")
668 .arg(stream_key)
669 .arg(consumer_group)
670 .query_async(conn)
671 .await;
672
673 match destroy_result {
674 Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
675 Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
676 Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
677 Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
678 }
679 }
680
681 // Try to create the consumer group
682 let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
683 .arg("CREATE")
684 .arg(stream_key)
685 .arg(consumer_group)
686 .arg(group_start_id)
687 .arg("MKSTREAM")
688 .query_async(conn)
689 .await;
690
691 match result {
692 Ok(_) => {
693 info!(
694 "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
695 );
696 Ok(())
697 }
698 Err(e) => {
699 // BUSYGROUP error means the group already exists
700 if e.to_string().contains("BUSYGROUP") {
701 info!(
702 "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
703 );
704 Ok(())
705 } else {
706 Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
707 }
708 }
709 }
710 }
711
712 /// Start the stream consumer task
713 async fn start_consumer_task(
714 source_id: String,
715 instance_id: String,
716 platform_config: PlatformConfig,
717 dispatchers: Arc<
718 RwLock<
719 Vec<
720 Box<
721 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
722 >,
723 >,
724 >,
725 >,
726 reporter: ComponentStatusHandle,
727 ) -> JoinHandle<()> {
728 let source_id_for_span = source_id.clone();
729 let span = tracing::info_span!(
730 "platform_source_consumer",
731 instance_id = %instance_id,
732 component_id = %source_id_for_span,
733 component_type = "source"
734 );
735 tokio::spawn(async move {
736 info!(
737 "Starting platform source consumer for source '{}' on stream '{}'",
738 source_id, platform_config.stream_key
739 );
740
741 // Connect to Redis
742 let mut conn = match Self::connect_with_retry(
743 &platform_config.redis_url,
744 platform_config.max_retries,
745 platform_config.retry_delay_ms,
746 )
747 .await
748 {
749 Ok(conn) => conn,
750 Err(e) => {
751 error!("Failed to connect to Redis: {e}");
752 reporter.set_status(
753 ComponentStatus::Stopped,
754 Some(format!("Failed to connect to Redis: {e}")),
755 ).await;
756 return;
757 }
758 };
759
760 // Create consumer group
761 if let Err(e) = Self::create_consumer_group(
762 &mut conn,
763 &platform_config.stream_key,
764 &platform_config.consumer_group,
765 &platform_config.start_id,
766 platform_config.always_create_consumer_group,
767 )
768 .await
769 {
770 error!("Failed to create consumer group: {e}");
771 reporter.set_status(
772 ComponentStatus::Stopped,
773 Some(format!("Failed to create consumer group: {e}")),
774 ).await;
775 return;
776 }
777
778 // Main consumer loop
779 loop {
780 // Read from stream using ">" to get next undelivered messages for this consumer group
781 let read_result: Result<StreamReadReply, redis::RedisError> =
782 redis::cmd("XREADGROUP")
783 .arg("GROUP")
784 .arg(&platform_config.consumer_group)
785 .arg(&platform_config.consumer_name)
786 .arg("COUNT")
787 .arg(platform_config.batch_size)
788 .arg("BLOCK")
789 .arg(platform_config.block_ms)
790 .arg("STREAMS")
791 .arg(&platform_config.stream_key)
792 .arg(">") // Always use ">" for consumer group reads
793 .query_async(&mut conn)
794 .await;
795
796 match read_result {
797 Ok(reply) => {
798 // Collect all stream IDs for batch acknowledgment
799 let mut all_stream_ids = Vec::new();
800
801 // Process each stream entry
802 for stream_key in reply.keys {
803 for stream_id in stream_key.ids {
804 debug!("Received event from stream: {}", stream_id.id);
805
806 // Store stream ID for batch acknowledgment
807 all_stream_ids.push(stream_id.id.clone());
808
809 // Extract event data
810 match extract_event_data(&stream_id.map) {
811 Ok(event_json) => {
812 // Parse JSON
813 match serde_json::from_str::<Value>(&event_json) {
814 Ok(cloud_event) => {
815 // Detect message type
816 let message_type =
817 detect_message_type(&cloud_event);
818
819 match message_type {
820 MessageType::Control(control_type) => {
821 // Handle control message
822 debug!(
823 "Detected control message of type: {control_type}"
824 );
825
826 match transform_control_event(
827 cloud_event,
828 &control_type,
829 ) {
830 Ok(control_events) => {
831 // Publish control events
832 for control_event in control_events
833 {
834 // Create profiling metadata with timestamps
835 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
836 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
837
838 let wrapper = SourceEventWrapper::with_profiling(
839 source_id.clone(),
840 SourceEvent::Control(control_event),
841 chrono::Utc::now(),
842 profiling,
843 );
844
845 // Dispatch via helper
846 if let Err(e) = SourceBase::dispatch_from_task(
847 dispatchers.clone(),
848 wrapper,
849 &source_id,
850 )
851 .await
852 {
853 debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
854 } else {
855 debug!(
856 "Published control event for stream {}",
857 stream_id.id
858 );
859 }
860 }
861 }
862 Err(e) => {
863 warn!(
864 "Failed to transform control event {}: {}",
865 stream_id.id, e
866 );
867 }
868 }
869 }
870 MessageType::Data => {
871 // Handle data message
872 match transform_platform_event(
873 cloud_event,
874 &source_id,
875 ) {
876 Ok(source_changes_with_timestamps) => {
877 // Publish source changes
878 for item in
879 source_changes_with_timestamps
880 {
881 // Create profiling metadata with timestamps
882 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
883 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
884
885 // Extract source_ns from SourceChange transaction time
886 profiling.source_ns = Some(
887 item.source_change
888 .get_transaction_time(),
889 );
890
891 // Set reactivator timestamps from event
892 profiling
893 .reactivator_start_ns =
894 item.reactivator_start_ns;
895 profiling.reactivator_end_ns =
896 item.reactivator_end_ns;
897
898 let wrapper = SourceEventWrapper::with_profiling(
899 source_id.clone(),
900 SourceEvent::Change(item.source_change),
901 chrono::Utc::now(),
902 profiling,
903 );
904
905 // Dispatch via helper
906 if let Err(e) = SourceBase::dispatch_from_task(
907 dispatchers.clone(),
908 wrapper,
909 &source_id,
910 )
911 .await
912 {
913 debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
914 } else {
915 debug!(
916 "Published source change for event {}",
917 stream_id.id
918 );
919 }
920 }
921 }
922 Err(e) => {
923 warn!(
924 "Failed to transform event {}: {}",
925 stream_id.id, e
926 );
927 reporter.set_status(
928 ComponentStatus::Running,
929 Some(format!(
930 "Transformation error: {e}"
931 )),
932 ).await;
933 }
934 }
935 }
936 }
937 }
938 Err(e) => {
939 warn!(
940 "Failed to parse JSON for event {}: {}",
941 stream_id.id, e
942 );
943 }
944 }
945 }
946 Err(e) => {
947 warn!(
948 "Failed to extract event data from {}: {}",
949 stream_id.id, e
950 );
951 }
952 }
953 }
954 }
955
956 // Batch acknowledge all messages at once
957 if !all_stream_ids.is_empty() {
958 debug!("Acknowledging batch of {} messages", all_stream_ids.len());
959
960 let mut cmd = redis::cmd("XACK");
961 cmd.arg(&platform_config.stream_key)
962 .arg(&platform_config.consumer_group);
963
964 // Add all stream IDs to the command
965 for stream_id in &all_stream_ids {
966 cmd.arg(stream_id);
967 }
968
969 match cmd.query_async::<_, i64>(&mut conn).await {
970 Ok(ack_count) => {
971 debug!("Successfully acknowledged {ack_count} messages");
972 if ack_count as usize != all_stream_ids.len() {
973 warn!(
974 "Acknowledged {} messages but expected {}",
975 ack_count,
976 all_stream_ids.len()
977 );
978 }
979 }
980 Err(e) => {
981 error!("Failed to acknowledge message batch: {e}");
982
983 // Fallback: Try acknowledging messages individually
984 warn!("Falling back to individual acknowledgments");
985 for stream_id in &all_stream_ids {
986 match redis::cmd("XACK")
987 .arg(&platform_config.stream_key)
988 .arg(&platform_config.consumer_group)
989 .arg(stream_id)
990 .query_async::<_, i64>(&mut conn)
991 .await
992 {
993 Ok(_) => {
994 debug!(
995 "Individually acknowledged message {stream_id}"
996 );
997 }
998 Err(e) => {
999 error!(
1000 "Failed to individually acknowledge message {stream_id}: {e}"
1001 );
1002 }
1003 }
1004 }
1005 }
1006 }
1007 }
1008 }
1009 Err(e) => {
1010 // Check if it's a connection error
1011 if is_connection_error(&e) {
1012 error!("Redis connection lost: {e}");
1013 reporter.set_status(
1014 ComponentStatus::Running,
1015 Some(format!("Redis connection lost: {e}")),
1016 ).await;
1017
1018 // Try to reconnect
1019 match Self::connect_with_retry(
1020 &platform_config.redis_url,
1021 platform_config.max_retries,
1022 platform_config.retry_delay_ms,
1023 )
1024 .await
1025 {
1026 Ok(new_conn) => {
1027 conn = new_conn;
1028 info!("Reconnected to Redis");
1029 }
1030 Err(e) => {
1031 error!("Failed to reconnect to Redis: {e}");
1032 reporter.set_status(ComponentStatus::Stopped, None).await;
1033 return;
1034 }
1035 }
1036 } else if !e.to_string().contains("timeout") {
1037 // Log non-timeout errors
1038 error!("Error reading from stream: {e}");
1039 }
1040 }
1041 }
1042 }
1043 }.instrument(span))
1044 }
1045}
1046
1047#[async_trait::async_trait]
1048impl Source for PlatformSource {
1049 fn id(&self) -> &str {
1050 &self.base.id
1051 }
1052
1053 fn type_name(&self) -> &str {
1054 "platform"
1055 }
1056
1057 fn properties(&self) -> HashMap<String, serde_json::Value> {
1058 use crate::descriptor::PlatformSourceConfigDto;
1059 use drasi_plugin_sdk::ConfigValue;
1060
1061 let dto = PlatformSourceConfigDto {
1062 redis_url: ConfigValue::Static(self.config.redis_url.clone()),
1063 stream_key: ConfigValue::Static(self.config.stream_key.clone()),
1064 consumer_group: ConfigValue::Static(self.config.consumer_group.clone()),
1065 consumer_name: self
1066 .config
1067 .consumer_name
1068 .as_ref()
1069 .map(|n| ConfigValue::Static(n.clone())),
1070 batch_size: ConfigValue::Static(self.config.batch_size),
1071 block_ms: ConfigValue::Static(self.config.block_ms),
1072 };
1073
1074 match serde_json::to_value(&dto) {
1075 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
1076 _ => HashMap::new(),
1077 }
1078 }
1079
1080 fn auto_start(&self) -> bool {
1081 self.base.get_auto_start()
1082 }
1083
1084 async fn start(&self) -> Result<()> {
1085 info!("Starting platform source: {}", self.base.id);
1086
1087 // Extract configuration from typed config
1088 let platform_config = PlatformConfig {
1089 redis_url: self.config.redis_url.clone(),
1090 stream_key: self.config.stream_key.clone(),
1091 consumer_group: self.config.consumer_group.clone(),
1092 consumer_name: self
1093 .config
1094 .consumer_name
1095 .clone()
1096 .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1097 batch_size: self.config.batch_size,
1098 block_ms: self.config.block_ms,
1099 start_id: ">".to_string(),
1100 always_create_consumer_group: false,
1101 max_retries: 5,
1102 retry_delay_ms: 1000,
1103 };
1104
1105 // Update status
1106 self.base
1107 .set_status(
1108 ComponentStatus::Running,
1109 Some("Platform source running".to_string()),
1110 )
1111 .await;
1112
1113 // Get instance_id from context for log routing isolation
1114 let instance_id = self
1115 .base
1116 .context()
1117 .await
1118 .map(|c| c.instance_id)
1119 .unwrap_or_default();
1120
1121 // Start consumer task
1122 let task = Self::start_consumer_task(
1123 self.base.id.clone(),
1124 instance_id,
1125 platform_config,
1126 self.base.dispatchers.clone(),
1127 self.base.status_handle(),
1128 )
1129 .await;
1130
1131 *self.base.task_handle.write().await = Some(task);
1132
1133 Ok(())
1134 }
1135
1136 async fn stop(&self) -> Result<()> {
1137 info!("Stopping platform source: {}", self.base.id);
1138
1139 // Cancel the task
1140 if let Some(handle) = self.base.task_handle.write().await.take() {
1141 handle.abort();
1142 }
1143
1144 // Update status
1145 self.base
1146 .set_status(
1147 ComponentStatus::Stopped,
1148 Some("Platform source stopped".to_string()),
1149 )
1150 .await;
1151
1152 Ok(())
1153 }
1154
1155 async fn status(&self) -> ComponentStatus {
1156 self.base.get_status().await
1157 }
1158
1159 async fn subscribe(
1160 &self,
1161 settings: drasi_lib::config::SourceSubscriptionSettings,
1162 ) -> Result<SubscriptionResponse> {
1163 self.base
1164 .subscribe_with_bootstrap(&settings, "Platform")
1165 .await
1166 }
1167
1168 fn as_any(&self) -> &dyn std::any::Any {
1169 self
1170 }
1171
1172 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1173 self.base.initialize(context).await;
1174 }
1175
1176 async fn set_bootstrap_provider(
1177 &self,
1178 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1179 ) {
1180 self.base.set_bootstrap_provider(provider).await;
1181 }
1182}
1183
1184impl PlatformSource {
1185 /// Create a test subscription to this source (synchronous)
1186 ///
1187 /// This method delegates to SourceBase and is provided for convenience in tests.
1188 /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1189 pub fn test_subscribe(
1190 &self,
1191 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1192 self.base.test_subscribe()
1193 }
1194
1195 /// Create a test subscription to this source (async)
1196 ///
1197 /// This method delegates to SourceBase and is provided for convenience in async tests.
1198 /// Prefer this method over test_subscribe() in async contexts.
1199 pub async fn test_subscribe_async(
1200 &self,
1201 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1202 self.base
1203 .create_streaming_receiver()
1204 .await
1205 .expect("Failed to create test subscription")
1206 }
1207}
1208
1209/// Extract event data from Redis stream entry
1210fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1211 // Look for common field names
1212 for key in &["data", "event", "payload", "message"] {
1213 if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1214 return String::from_utf8(data.clone())
1215 .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1216 }
1217 }
1218
1219 Err(anyhow::anyhow!(
1220 "No event data found in stream entry. Available keys: {:?}",
1221 entry_map.keys().collect::<Vec<_>>()
1222 ))
1223}
1224
1225/// Check if error is a connection error
1226fn is_connection_error(e: &redis::RedisError) -> bool {
1227 e.is_connection_dropped()
1228 || e.is_io_error()
1229 || e.to_string().contains("connection")
1230 || e.to_string().contains("EOF")
1231}
1232
1233/// Message type discriminator
1234#[derive(Debug, Clone, PartialEq)]
1235enum MessageType {
1236 /// Control message with control type from source.table
1237 Control(String),
1238 /// Data message (node/relation change)
1239 Data,
1240}
1241
1242/// Detect message type based on payload.source.db field
1243///
1244/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1245/// Returns MessageType::Data for all other cases
1246fn detect_message_type(cloud_event: &Value) -> MessageType {
1247 // Extract data array and get first event to check message type
1248 let data_array = match cloud_event["data"].as_array() {
1249 Some(arr) if !arr.is_empty() => arr,
1250 _ => return MessageType::Data, // Default to data if no data array
1251 };
1252
1253 // Check the first event's source.db field
1254 let first_event = &data_array[0];
1255 let source_db = first_event["payload"]["source"]["db"]
1256 .as_str()
1257 .unwrap_or("");
1258
1259 // Case-insensitive comparison with "Drasi"
1260 if source_db.eq_ignore_ascii_case("drasi") {
1261 // Extract source.table to determine control type
1262 let control_type = first_event["payload"]["source"]["table"]
1263 .as_str()
1264 .unwrap_or("Unknown")
1265 .to_string();
1266 MessageType::Control(control_type)
1267 } else {
1268 MessageType::Data
1269 }
1270}
1271
1272/// Helper struct to hold SourceChange along with reactivator timestamps
1273#[derive(Debug)]
1274struct SourceChangeWithTimestamps {
1275 source_change: SourceChange,
1276 reactivator_start_ns: Option<u64>,
1277 reactivator_end_ns: Option<u64>,
1278}
1279
1280/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1281///
1282/// Handles events in CloudEvent format with a data array containing change events.
1283/// Each event in the data array has: op, payload.after/before, payload.source
1284fn transform_platform_event(
1285 cloud_event: Value,
1286 source_id: &str,
1287) -> Result<Vec<SourceChangeWithTimestamps>> {
1288 let mut source_changes = Vec::new();
1289
1290 // Extract the data array from CloudEvent wrapper
1291 let data_array = cloud_event["data"]
1292 .as_array()
1293 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1294
1295 // Process each event in the data array
1296 for event in data_array {
1297 // Extract reactivator timestamps from top-level event fields
1298 let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1299 let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1300
1301 // Extract operation type (op field: "i", "u", "d")
1302 let op = event["op"]
1303 .as_str()
1304 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1305
1306 // Extract payload
1307 let payload = &event["payload"];
1308 if payload.is_null() {
1309 return Err(anyhow::anyhow!("Missing 'payload' field"));
1310 }
1311
1312 // Extract element type from payload.source.table
1313 let element_type = payload["source"]["table"]
1314 .as_str()
1315 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1316
1317 // Get element data based on operation
1318 let element_data = match op {
1319 "i" | "u" => &payload["after"],
1320 "d" => &payload["before"],
1321 _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1322 };
1323
1324 if element_data.is_null() {
1325 return Err(anyhow::anyhow!(
1326 "Missing element data (after/before) for operation {op}"
1327 ));
1328 }
1329
1330 // Extract element ID
1331 let element_id = element_data["id"]
1332 .as_str()
1333 .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1334
1335 // Extract labels
1336 let labels_array = element_data["labels"]
1337 .as_array()
1338 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1339
1340 let labels: Vec<Arc<str>> = labels_array
1341 .iter()
1342 .filter_map(|v| v.as_str().map(Arc::from))
1343 .collect();
1344
1345 if labels.is_empty() {
1346 return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1347 }
1348
1349 // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1350 let ts_ns = payload["source"]["ts_ns"]
1351 .as_u64()
1352 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1353 let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1354
1355 // Build ElementMetadata
1356 let reference = ElementReference::new(source_id, element_id);
1357 let metadata = ElementMetadata {
1358 reference,
1359 labels: labels.into(),
1360 effective_from,
1361 };
1362
1363 // Handle delete operation (no properties needed)
1364 if op == "d" {
1365 source_changes.push(SourceChangeWithTimestamps {
1366 source_change: SourceChange::Delete { metadata },
1367 reactivator_start_ns,
1368 reactivator_end_ns,
1369 });
1370 continue;
1371 }
1372
1373 // Convert properties
1374 let properties_obj = element_data["properties"]
1375 .as_object()
1376 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1377
1378 let properties = convert_json_to_element_properties(properties_obj);
1379
1380 // Build element based on type
1381 let element = match element_type {
1382 "node" => Element::Node {
1383 metadata,
1384 properties,
1385 },
1386 "rel" | "relation" => {
1387 // Extract startId and endId
1388 let start_id = element_data["startId"]
1389 .as_str()
1390 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1391 let end_id = element_data["endId"]
1392 .as_str()
1393 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1394
1395 Element::Relation {
1396 metadata,
1397 properties,
1398 in_node: ElementReference::new(source_id, start_id),
1399 out_node: ElementReference::new(source_id, end_id),
1400 }
1401 }
1402 _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1403 };
1404
1405 // Build SourceChange
1406 let source_change = match op {
1407 "i" => SourceChange::Insert { element },
1408 "u" => SourceChange::Update { element },
1409 _ => unreachable!(),
1410 };
1411
1412 source_changes.push(SourceChangeWithTimestamps {
1413 source_change,
1414 reactivator_start_ns,
1415 reactivator_end_ns,
1416 });
1417 }
1418
1419 Ok(source_changes)
1420}
1421
1422/// Transform CloudEvent-wrapped control event to SourceControl(s)
1423///
1424/// Handles control messages from Query API service with source.db = "Drasi"
1425/// Currently supports "SourceSubscription" control type
1426fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1427 let mut control_events = Vec::new();
1428
1429 // Extract the data array from CloudEvent wrapper
1430 let data_array = cloud_event["data"]
1431 .as_array()
1432 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1433
1434 // Check if control type is supported
1435 if control_type != "SourceSubscription" {
1436 info!(
1437 "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1438 );
1439 return Ok(control_events); // Return empty vec for unknown types
1440 }
1441
1442 // Process each event in the data array
1443 for event in data_array {
1444 // Extract operation type (op field: "i", "u", "d")
1445 let op = event["op"]
1446 .as_str()
1447 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1448
1449 // Extract payload
1450 let payload = &event["payload"];
1451 if payload.is_null() {
1452 warn!("Missing 'payload' field in control event, skipping");
1453 continue;
1454 }
1455
1456 // Get data based on operation
1457 let control_data = match op {
1458 "i" | "u" => &payload["after"],
1459 "d" => &payload["before"],
1460 _ => {
1461 warn!("Unknown operation type in control event: {op}, skipping");
1462 continue;
1463 }
1464 };
1465
1466 if control_data.is_null() {
1467 warn!("Missing control data (after/before) for operation {op}, skipping");
1468 continue;
1469 }
1470
1471 // Extract required fields for SourceSubscription
1472 let query_id = match control_data["queryId"].as_str() {
1473 Some(id) => id.to_string(),
1474 None => {
1475 warn!("Missing required 'queryId' field in control event, skipping");
1476 continue;
1477 }
1478 };
1479
1480 let query_node_id = match control_data["queryNodeId"].as_str() {
1481 Some(id) => id.to_string(),
1482 None => {
1483 warn!("Missing required 'queryNodeId' field in control event, skipping");
1484 continue;
1485 }
1486 };
1487
1488 // Extract optional label arrays (default to empty if missing)
1489 let node_labels = control_data["nodeLabels"]
1490 .as_array()
1491 .map(|arr| {
1492 arr.iter()
1493 .filter_map(|v| v.as_str().map(String::from))
1494 .collect()
1495 })
1496 .unwrap_or_default();
1497
1498 let rel_labels = control_data["relLabels"]
1499 .as_array()
1500 .map(|arr| {
1501 arr.iter()
1502 .filter_map(|v| v.as_str().map(String::from))
1503 .collect()
1504 })
1505 .unwrap_or_default();
1506
1507 // Map operation to ControlOperation
1508 let operation = match op {
1509 "i" => ControlOperation::Insert,
1510 "u" => ControlOperation::Update,
1511 "d" => ControlOperation::Delete,
1512 _ => unreachable!(), // Already filtered above
1513 };
1514
1515 // Build SourceControl::Subscription
1516 let control_event = SourceControl::Subscription {
1517 query_id,
1518 query_node_id,
1519 node_labels,
1520 rel_labels,
1521 operation,
1522 };
1523
1524 control_events.push(control_event);
1525 }
1526
1527 Ok(control_events)
1528}
1529
1530/// Dynamic plugin entry point.
1531///
1532/// Dynamic plugin entry point.
1533#[cfg(feature = "dynamic-plugin")]
1534drasi_plugin_sdk::export_plugin!(
1535 plugin_id = "platform-source",
1536 core_version = env!("CARGO_PKG_VERSION"),
1537 lib_version = env!("CARGO_PKG_VERSION"),
1538 plugin_version = env!("CARGO_PKG_VERSION"),
1539 source_descriptors = [descriptor::PlatformSourceDescriptor],
1540 reaction_descriptors = [],
1541 bootstrap_descriptors = [],
1542);