Skip to main content

drasi_source_platform/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(unexpected_cfgs)]
16
17//! Platform Source Plugin for Drasi
18//!
19//! This plugin consumes data change events from Redis Streams, which is the primary
20//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
21//! containing node and relation changes, as well as control events for query subscriptions.
22//!
23//! # Architecture
24//!
25//! The platform source connects to Redis as a consumer group member, enabling:
26//! - **At-least-once delivery**: Messages are acknowledged after processing
27//! - **Horizontal scaling**: Multiple consumers can share the workload
28//! - **Fault tolerance**: Unacknowledged messages are redelivered
29//!
30//! # Configuration
31//!
32//! | Field | Type | Default | Description |
33//! |-------|------|---------|-------------|
34//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
35//! | `stream_key` | string | *required* | Redis stream key to consume from |
36//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
37//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
38//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
39//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
40//!
41//! # Data Format
42//!
43//! The platform source expects CloudEvent-wrapped messages with a `data` array
44//! containing change events. Each event includes an operation type and payload.
45//!
46//! ## Node Insert
47//!
48//! ```json
49//! {
50//!     "data": [{
51//!         "op": "i",
52//!         "payload": {
53//!             "after": {
54//!                 "id": "user-123",
55//!                 "labels": ["User"],
56//!                 "properties": {
57//!                     "name": "Alice",
58//!                     "email": "alice@example.com"
59//!                 }
60//!             },
61//!             "source": {
62//!                 "db": "mydb",
63//!                 "table": "node",
64//!                 "ts_ns": 1699900000000000000
65//!             }
66//!         }
67//!     }]
68//! }
69//! ```
70//!
71//! ## Node Update
72//!
73//! ```json
74//! {
75//!     "data": [{
76//!         "op": "u",
77//!         "payload": {
78//!             "after": {
79//!                 "id": "user-123",
80//!                 "labels": ["User"],
81//!                 "properties": { "name": "Alice Updated" }
82//!             },
83//!             "source": { "table": "node", "ts_ns": 1699900001000000000 }
84//!         }
85//!     }]
86//! }
87//! ```
88//!
89//! ## Node Delete
90//!
91//! ```json
92//! {
93//!     "data": [{
94//!         "op": "d",
95//!         "payload": {
96//!             "before": {
97//!                 "id": "user-123",
98//!                 "labels": ["User"],
99//!                 "properties": {}
100//!             },
101//!             "source": { "table": "node", "ts_ns": 1699900002000000000 }
102//!         }
103//!     }]
104//! }
105//! ```
106//!
107//! ## Relation Insert
108//!
109//! ```json
110//! {
111//!     "data": [{
112//!         "op": "i",
113//!         "payload": {
114//!             "after": {
115//!                 "id": "follows-1",
116//!                 "labels": ["FOLLOWS"],
117//!                 "startId": "user-123",
118//!                 "endId": "user-456",
119//!                 "properties": { "since": "2024-01-01" }
120//!             },
121//!             "source": { "table": "rel", "ts_ns": 1699900003000000000 }
122//!         }
123//!     }]
124//! }
125//! ```
126//!
127//! # Control Events
128//!
129//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
130//! Currently supported control types:
131//!
132//! - **SourceSubscription**: Query subscription management
133//!
134//! # Example Configuration (YAML)
135//!
136//! ```yaml
137//! source_type: platform
138//! properties:
139//!   redis_url: "redis://localhost:6379"
140//!   stream_key: "my-app-changes"
141//!   consumer_group: "drasi-consumers"
142//!   batch_size: 50
143//!   block_ms: 10000
144//! ```
145//!
146//! # Usage Example
147//!
148//! ```rust,ignore
149//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
150//! use std::sync::Arc;
151//!
152//! let config = PlatformSourceBuilder::new()
153//!     .with_redis_url("redis://localhost:6379")
154//!     .with_stream_key("my-changes")
155//!     .with_consumer_group("my-consumers")
156//!     .build();
157//!
158//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
159//! drasi.add_source(source).await?;
160//! ```
161
162pub mod config;
163pub mod descriptor;
164pub use config::PlatformSourceConfig;
165
166use anyhow::Result;
167use log::{debug, error, info, warn};
168use redis::streams::StreamReadReply;
169use serde_json::Value;
170use std::collections::HashMap;
171use std::sync::Arc;
172use std::time::Duration;
173use tokio::sync::RwLock;
174use tokio::task::JoinHandle;
175
176use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
177use drasi_lib::channels::{
178    ComponentStatus, ControlOperation, DispatchMode, SourceControl, SourceEvent,
179    SourceEventWrapper, SubscriptionResponse,
180};
181use drasi_lib::component_graph::ComponentStatusHandle;
182use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
183use drasi_lib::sources::manager::convert_json_to_element_properties;
184use drasi_lib::Source;
185use tracing::Instrument;
186
187#[cfg(test)]
188mod tests;
189
190/// Configuration for the platform source
191#[derive(Debug, Clone)]
192struct PlatformConfig {
193    /// Redis connection URL
194    redis_url: String,
195    /// Redis stream key to read from
196    stream_key: String,
197    /// Consumer group name
198    consumer_group: String,
199    /// Consumer name (should be unique per instance)
200    consumer_name: String,
201    /// Number of events to read per XREADGROUP call
202    batch_size: usize,
203    /// Milliseconds to block waiting for events
204    block_ms: u64,
205    /// Stream position to start from (">" for new, "0" for all)
206    start_id: String,
207    /// Always recreate consumer group on startup (default: false)
208    /// If true, deletes and recreates the consumer group using start_id
209    /// If false, uses existing group position (ignores start_id if group exists)
210    always_create_consumer_group: bool,
211    /// Maximum connection retry attempts
212    max_retries: usize,
213    /// Delay between retries in milliseconds
214    retry_delay_ms: u64,
215}
216
217impl Default for PlatformConfig {
218    fn default() -> Self {
219        Self {
220            redis_url: String::new(),
221            stream_key: String::new(),
222            consumer_group: String::new(),
223            consumer_name: String::new(),
224            batch_size: 10,
225            block_ms: 5000,
226            start_id: ">".to_string(),
227            always_create_consumer_group: false,
228            max_retries: 3,
229            retry_delay_ms: 1000,
230        }
231    }
232}
233
234/// Platform source that reads events from Redis Streams.
235///
236/// This source connects to a Redis instance and consumes CloudEvent-wrapped
237/// messages from a stream using consumer groups. It supports both data events
238/// (node/relation changes) and control events (query subscriptions).
239///
240/// # Fields
241///
242/// - `base`: Common source functionality (dispatchers, status, lifecycle)
243/// - `config`: Platform-specific configuration (Redis connection, stream settings)
244pub struct PlatformSource {
245    /// Base source implementation providing common functionality
246    base: SourceBase,
247    /// Platform source configuration
248    config: PlatformSourceConfig,
249}
250
251/// Builder for creating [`PlatformSource`] instances.
252///
253/// Provides a fluent API for constructing platform sources
254/// with sensible defaults.
255///
256/// # Example
257///
258/// ```rust,ignore
259/// use drasi_source_platform::PlatformSource;
260///
261/// let source = PlatformSource::builder("my-platform-source")
262///     .with_redis_url("redis://localhost:6379")
263///     .with_stream_key("my-app-changes")
264///     .with_consumer_group("my-consumers")
265///     .with_batch_size(50)
266///     .build()?;
267/// ```
268pub struct PlatformSourceBuilder {
269    id: String,
270    redis_url: String,
271    stream_key: String,
272    consumer_group: Option<String>,
273    consumer_name: Option<String>,
274    batch_size: Option<usize>,
275    block_ms: Option<u64>,
276    dispatch_mode: Option<DispatchMode>,
277    dispatch_buffer_capacity: Option<usize>,
278    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
279    auto_start: bool,
280}
281
282impl PlatformSourceBuilder {
283    /// Create a new builder with the given ID and default values.
284    pub fn new(id: impl Into<String>) -> Self {
285        Self {
286            id: id.into(),
287            redis_url: String::new(),
288            stream_key: String::new(),
289            consumer_group: None,
290            consumer_name: None,
291            batch_size: None,
292            block_ms: None,
293            dispatch_mode: None,
294            dispatch_buffer_capacity: None,
295            bootstrap_provider: None,
296            auto_start: true,
297        }
298    }
299
300    /// Set the Redis connection URL.
301    ///
302    /// # Arguments
303    ///
304    /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
305    pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
306        self.redis_url = url.into();
307        self
308    }
309
310    /// Set the Redis stream key to consume from.
311    ///
312    /// # Arguments
313    ///
314    /// * `key` - Name of the Redis stream
315    pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
316        self.stream_key = key.into();
317        self
318    }
319
320    /// Set the consumer group name.
321    ///
322    /// # Arguments
323    ///
324    /// * `group` - Consumer group name (default: `"drasi-core"`)
325    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
326        self.consumer_group = Some(group.into());
327        self
328    }
329
330    /// Set a unique consumer name within the group.
331    ///
332    /// # Arguments
333    ///
334    /// * `name` - Unique consumer name (auto-generated if not specified)
335    pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
336        self.consumer_name = Some(name.into());
337        self
338    }
339
340    /// Set the batch size for reading events.
341    ///
342    /// # Arguments
343    ///
344    /// * `size` - Number of events to read per XREADGROUP call (default: 100)
345    pub fn with_batch_size(mut self, size: usize) -> Self {
346        self.batch_size = Some(size);
347        self
348    }
349
350    /// Set the block timeout for waiting on events.
351    ///
352    /// # Arguments
353    ///
354    /// * `ms` - Milliseconds to block waiting for events (default: 5000)
355    pub fn with_block_ms(mut self, ms: u64) -> Self {
356        self.block_ms = Some(ms);
357        self
358    }
359
360    /// Set the dispatch mode for this source
361    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
362        self.dispatch_mode = Some(mode);
363        self
364    }
365
366    /// Set the dispatch buffer capacity for this source
367    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
368        self.dispatch_buffer_capacity = Some(capacity);
369        self
370    }
371
372    /// Set the bootstrap provider for this source
373    pub fn with_bootstrap_provider(
374        mut self,
375        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
376    ) -> Self {
377        self.bootstrap_provider = Some(Box::new(provider));
378        self
379    }
380
381    /// Set whether this source should auto-start when DrasiLib starts.
382    ///
383    /// Default is `true`. Set to `false` if this source should only be
384    /// started manually via `start_source()`.
385    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
386        self.auto_start = auto_start;
387        self
388    }
389
390    /// Set the full configuration at once
391    pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
392        self.redis_url = config.redis_url;
393        self.stream_key = config.stream_key;
394        self.consumer_group = Some(config.consumer_group);
395        self.consumer_name = config.consumer_name;
396        self.batch_size = Some(config.batch_size);
397        self.block_ms = Some(config.block_ms);
398        self
399    }
400
401    /// Build the platform source.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the source cannot be constructed.
406    pub fn build(self) -> Result<PlatformSource> {
407        let config = PlatformSourceConfig {
408            redis_url: self.redis_url,
409            stream_key: self.stream_key,
410            consumer_group: self
411                .consumer_group
412                .unwrap_or_else(|| "drasi-core".to_string()),
413            consumer_name: self.consumer_name,
414            batch_size: self.batch_size.unwrap_or(100),
415            block_ms: self.block_ms.unwrap_or(5000),
416        };
417
418        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
419        if let Some(mode) = self.dispatch_mode {
420            params = params.with_dispatch_mode(mode);
421        }
422        if let Some(capacity) = self.dispatch_buffer_capacity {
423            params = params.with_dispatch_buffer_capacity(capacity);
424        }
425        if let Some(provider) = self.bootstrap_provider {
426            params = params.with_bootstrap_provider(provider);
427        }
428
429        Ok(PlatformSource {
430            base: SourceBase::new(params)?,
431            config,
432        })
433    }
434}
435
436impl PlatformSource {
437    /// Create a builder for PlatformSource
438    ///
439    /// # Example
440    ///
441    /// ```rust,ignore
442    /// use drasi_source_platform::PlatformSource;
443    ///
444    /// let source = PlatformSource::builder("my-platform-source")
445    ///     .with_redis_url("redis://localhost:6379")
446    ///     .with_stream_key("my-changes")
447    ///     .with_bootstrap_provider(my_provider)
448    ///     .build()?;
449    /// ```
450    pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
451        PlatformSourceBuilder::new(id)
452    }
453
454    /// Create a new platform source.
455    ///
456    /// The event channel is automatically injected when the source is added
457    /// to DrasiLib via `add_source()`.
458    ///
459    /// # Arguments
460    ///
461    /// * `id` - Unique identifier for this source instance
462    /// * `config` - Platform source configuration
463    ///
464    /// # Returns
465    ///
466    /// A new `PlatformSource` instance, or an error if construction fails.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the base source cannot be initialized.
471    ///
472    /// # Example
473    ///
474    /// ```rust,ignore
475    /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
476    ///
477    /// let config = PlatformSourceBuilder::new("my-platform-source")
478    ///     .with_redis_url("redis://localhost:6379")
479    ///     .with_stream_key("my-changes")
480    ///     .build()?;
481    /// ```
482    pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
483        let id = id.into();
484        let params = SourceBaseParams::new(id);
485        Ok(Self {
486            base: SourceBase::new(params)?,
487            config,
488        })
489    }
490
491    /// Subscribe to source events (for testing)
492    ///
493    /// This method is intended for use in tests to receive events broadcast by the source.
494    /// In production, queries subscribe to sources through the SourceManager.
495    /// Parse configuration from properties
496    #[allow(dead_code)]
497    fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
498        // Extract required fields first
499        let redis_url = properties
500            .get("redis_url")
501            .and_then(|v| v.as_str())
502            .ok_or_else(|| {
503                anyhow::anyhow!(
504                    "Configuration error: Missing required field 'redis_url'. \
505                 Platform source requires a Redis connection URL"
506                )
507            })?
508            .to_string();
509
510        let stream_key = properties
511            .get("stream_key")
512            .and_then(|v| v.as_str())
513            .ok_or_else(|| {
514                anyhow::anyhow!(
515                    "Configuration error: Missing required field 'stream_key'. \
516                 Platform source requires a Redis Stream key to read from"
517                )
518            })?
519            .to_string();
520
521        let consumer_group = properties
522            .get("consumer_group")
523            .and_then(|v| v.as_str())
524            .ok_or_else(|| {
525                anyhow::anyhow!(
526                    "Configuration error: Missing required field 'consumer_group'. \
527                 Platform source requires a consumer group name"
528                )
529            })?
530            .to_string();
531
532        let consumer_name = properties
533            .get("consumer_name")
534            .and_then(|v| v.as_str())
535            .ok_or_else(|| {
536                anyhow::anyhow!(
537                    "Configuration error: Missing required field 'consumer_name'. \
538                 Platform source requires a unique consumer name"
539                )
540            })?
541            .to_string();
542
543        // Get defaults for optional field handling
544        let defaults = PlatformConfig::default();
545
546        // Build config with optional fields
547        let config = PlatformConfig {
548            redis_url,
549            stream_key,
550            consumer_group,
551            consumer_name,
552            batch_size: properties
553                .get("batch_size")
554                .and_then(|v| v.as_u64())
555                .map(|v| v as usize)
556                .unwrap_or(defaults.batch_size),
557            block_ms: properties
558                .get("block_ms")
559                .and_then(|v| v.as_u64())
560                .unwrap_or(defaults.block_ms),
561            start_id: properties
562                .get("start_id")
563                .and_then(|v| v.as_str())
564                .map(|s| s.to_string())
565                .unwrap_or(defaults.start_id),
566            always_create_consumer_group: properties
567                .get("always_create_consumer_group")
568                .and_then(|v| v.as_bool())
569                .unwrap_or(defaults.always_create_consumer_group),
570            max_retries: properties
571                .get("max_retries")
572                .and_then(|v| v.as_u64())
573                .map(|v| v as usize)
574                .unwrap_or(defaults.max_retries),
575            retry_delay_ms: properties
576                .get("retry_delay_ms")
577                .and_then(|v| v.as_u64())
578                .unwrap_or(defaults.retry_delay_ms),
579        };
580
581        // Validate
582        if config.redis_url.is_empty() {
583            return Err(anyhow::anyhow!(
584                "Validation error: redis_url cannot be empty. \
585                 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
586            ));
587        }
588        if config.stream_key.is_empty() {
589            return Err(anyhow::anyhow!(
590                "Validation error: stream_key cannot be empty. \
591                 Please specify the Redis Stream key to read from"
592            ));
593        }
594        if config.consumer_group.is_empty() {
595            return Err(anyhow::anyhow!(
596                "Validation error: consumer_group cannot be empty. \
597                 Please specify a consumer group name for this source"
598            ));
599        }
600        if config.consumer_name.is_empty() {
601            return Err(anyhow::anyhow!(
602                "Validation error: consumer_name cannot be empty. \
603                 Please specify a unique consumer name within the consumer group"
604            ));
605        }
606
607        Ok(config)
608    }
609
610    /// Connect to Redis with retry logic
611    async fn connect_with_retry(
612        redis_url: &str,
613        max_retries: usize,
614        retry_delay_ms: u64,
615    ) -> Result<redis::aio::MultiplexedConnection> {
616        let client = redis::Client::open(redis_url)?;
617        let mut delay = retry_delay_ms;
618
619        for attempt in 0..max_retries {
620            match client.get_multiplexed_async_connection().await {
621                Ok(conn) => {
622                    info!("Successfully connected to Redis");
623                    return Ok(conn);
624                }
625                Err(e) if attempt < max_retries - 1 => {
626                    warn!(
627                        "Redis connection failed (attempt {}/{}): {}",
628                        attempt + 1,
629                        max_retries,
630                        e
631                    );
632                    tokio::time::sleep(Duration::from_millis(delay)).await;
633                    delay *= 2; // Exponential backoff
634                }
635                Err(e) => {
636                    return Err(anyhow::anyhow!(
637                        "Failed to connect to Redis after {max_retries} attempts: {e}"
638                    ));
639                }
640            }
641        }
642
643        unreachable!()
644    }
645
646    /// Create or recreate consumer group based on configuration
647    async fn create_consumer_group(
648        conn: &mut redis::aio::MultiplexedConnection,
649        stream_key: &str,
650        consumer_group: &str,
651        start_id: &str,
652        always_create: bool,
653    ) -> Result<()> {
654        // Determine the initial position for the consumer group
655        let group_start_id = if start_id == ">" {
656            "$" // ">" means only new messages, so create group at end
657        } else {
658            start_id // "0" or specific ID
659        };
660
661        // If always_create is true, delete the existing group first
662        if always_create {
663            info!(
664                "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
665            );
666
667            let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
668                .arg("DESTROY")
669                .arg(stream_key)
670                .arg(consumer_group)
671                .query_async(conn)
672                .await;
673
674            match destroy_result {
675                Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
676                Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
677                Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
678                Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
679            }
680        }
681
682        // Try to create the consumer group
683        let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
684            .arg("CREATE")
685            .arg(stream_key)
686            .arg(consumer_group)
687            .arg(group_start_id)
688            .arg("MKSTREAM")
689            .query_async(conn)
690            .await;
691
692        match result {
693            Ok(_) => {
694                info!(
695                    "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
696                );
697                Ok(())
698            }
699            Err(e) => {
700                // BUSYGROUP error means the group already exists
701                if e.to_string().contains("BUSYGROUP") {
702                    info!(
703                        "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
704                    );
705                    Ok(())
706                } else {
707                    Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
708                }
709            }
710        }
711    }
712
713    /// Start the stream consumer task
714    async fn start_consumer_task(
715        source_id: String,
716        instance_id: String,
717        platform_config: PlatformConfig,
718        dispatchers: Arc<
719            RwLock<
720                Vec<
721                    Box<
722                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
723                    >,
724                >,
725            >,
726        >,
727        reporter: ComponentStatusHandle,
728    ) -> JoinHandle<()> {
729        let source_id_for_span = source_id.clone();
730        let span = tracing::info_span!(
731            "platform_source_consumer",
732            instance_id = %instance_id,
733            component_id = %source_id_for_span,
734            component_type = "source"
735        );
736        tokio::spawn(async move {
737            info!(
738                "Starting platform source consumer for source '{}' on stream '{}'",
739                source_id, platform_config.stream_key
740            );
741
742            // Connect to Redis
743            let mut conn = match Self::connect_with_retry(
744                &platform_config.redis_url,
745                platform_config.max_retries,
746                platform_config.retry_delay_ms,
747            )
748            .await
749            {
750                Ok(conn) => conn,
751                Err(e) => {
752                    error!("Failed to connect to Redis: {e}");
753                    reporter.set_status(
754                        ComponentStatus::Stopped,
755                        Some(format!("Failed to connect to Redis: {e}")),
756                    ).await;
757                    return;
758                }
759            };
760
761            // Create consumer group
762            if let Err(e) = Self::create_consumer_group(
763                &mut conn,
764                &platform_config.stream_key,
765                &platform_config.consumer_group,
766                &platform_config.start_id,
767                platform_config.always_create_consumer_group,
768            )
769            .await
770            {
771                error!("Failed to create consumer group: {e}");
772                reporter.set_status(
773                    ComponentStatus::Stopped,
774                    Some(format!("Failed to create consumer group: {e}")),
775                ).await;
776                return;
777            }
778
779            // Main consumer loop
780            loop {
781                // Read from stream using ">" to get next undelivered messages for this consumer group
782                let read_result: Result<StreamReadReply, redis::RedisError> =
783                    redis::cmd("XREADGROUP")
784                        .arg("GROUP")
785                        .arg(&platform_config.consumer_group)
786                        .arg(&platform_config.consumer_name)
787                        .arg("COUNT")
788                        .arg(platform_config.batch_size)
789                        .arg("BLOCK")
790                        .arg(platform_config.block_ms)
791                        .arg("STREAMS")
792                        .arg(&platform_config.stream_key)
793                        .arg(">") // Always use ">" for consumer group reads
794                        .query_async(&mut conn)
795                        .await;
796
797                match read_result {
798                    Ok(reply) => {
799                        // Collect all stream IDs for batch acknowledgment
800                        let mut all_stream_ids = Vec::new();
801
802                        // Process each stream entry
803                        for stream_key in reply.keys {
804                            for stream_id in stream_key.ids {
805                                debug!("Received event from stream: {}", stream_id.id);
806
807                                // Store stream ID for batch acknowledgment
808                                all_stream_ids.push(stream_id.id.clone());
809
810                                // Extract event data
811                                match extract_event_data(&stream_id.map) {
812                                    Ok(event_json) => {
813                                        // Parse JSON
814                                        match serde_json::from_str::<Value>(&event_json) {
815                                            Ok(cloud_event) => {
816                                                // Detect message type
817                                                let message_type =
818                                                    detect_message_type(&cloud_event);
819
820                                                match message_type {
821                                                    MessageType::Control(control_type) => {
822                                                        // Handle control message
823                                                        debug!(
824                                                            "Detected control message of type: {control_type}"
825                                                        );
826
827                                                        match transform_control_event(
828                                                            cloud_event,
829                                                            &control_type,
830                                                        ) {
831                                                            Ok(control_events) => {
832                                                                // Publish control events
833                                                                for control_event in control_events
834                                                                {
835                                                                    // Create profiling metadata with timestamps
836                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
837                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
838
839                                                                    let wrapper = SourceEventWrapper::with_profiling(
840                                                                        source_id.clone(),
841                                                                        SourceEvent::Control(control_event),
842                                                                        chrono::Utc::now(),
843                                                                        profiling,
844                                                                    );
845
846                                                                    // Dispatch via helper
847                                                                    if let Err(e) = SourceBase::dispatch_from_task(
848                                                                        dispatchers.clone(),
849                                                                        wrapper,
850                                                                        &source_id,
851                                                                    )
852                                                                    .await
853                                                                    {
854                                                                        debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
855                                                                    } else {
856                                                                        debug!(
857                                                                            "Published control event for stream {}",
858                                                                            stream_id.id
859                                                                        );
860                                                                    }
861                                                                }
862                                                            }
863                                                            Err(e) => {
864                                                                warn!(
865                                                                    "Failed to transform control event {}: {}",
866                                                                    stream_id.id, e
867                                                                );
868                                                            }
869                                                        }
870                                                    }
871                                                    MessageType::Data => {
872                                                        // Handle data message
873                                                        match transform_platform_event(
874                                                            cloud_event,
875                                                            &source_id,
876                                                        ) {
877                                                            Ok(source_changes_with_timestamps) => {
878                                                                // Publish source changes
879                                                                for item in
880                                                                    source_changes_with_timestamps
881                                                                {
882                                                                    // Create profiling metadata with timestamps
883                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
884                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
885
886                                                                    // Extract source_ns from SourceChange transaction time
887                                                                    profiling.source_ns = Some(
888                                                                        item.source_change
889                                                                            .get_transaction_time(),
890                                                                    );
891
892                                                                    // Set reactivator timestamps from event
893                                                                    profiling
894                                                                        .reactivator_start_ns =
895                                                                        item.reactivator_start_ns;
896                                                                    profiling.reactivator_end_ns =
897                                                                        item.reactivator_end_ns;
898
899                                                                    let wrapper = SourceEventWrapper::with_profiling(
900                                                                        source_id.clone(),
901                                                                        SourceEvent::Change(item.source_change),
902                                                                        chrono::Utc::now(),
903                                                                        profiling,
904                                                                    );
905
906                                                                    // Dispatch via helper
907                                                                    if let Err(e) = SourceBase::dispatch_from_task(
908                                                                        dispatchers.clone(),
909                                                                        wrapper,
910                                                                        &source_id,
911                                                                    )
912                                                                    .await
913                                                                    {
914                                                                        debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
915                                                                    } else {
916                                                                        debug!(
917                                                                            "Published source change for event {}",
918                                                                            stream_id.id
919                                                                        );
920                                                                    }
921                                                                }
922                                                            }
923                                                            Err(e) => {
924                                                                warn!(
925                                                                    "Failed to transform event {}: {}",
926                                                                    stream_id.id, e
927                                                                );
928                                                                reporter.set_status(
929                                                                    ComponentStatus::Running,
930                                                                    Some(format!(
931                                                                        "Transformation error: {e}"
932                                                                    )),
933                                                                ).await;
934                                                            }
935                                                        }
936                                                    }
937                                                }
938                                            }
939                                            Err(e) => {
940                                                warn!(
941                                                    "Failed to parse JSON for event {}: {}",
942                                                    stream_id.id, e
943                                                );
944                                            }
945                                        }
946                                    }
947                                    Err(e) => {
948                                        warn!(
949                                            "Failed to extract event data from {}: {}",
950                                            stream_id.id, e
951                                        );
952                                    }
953                                }
954                            }
955                        }
956
957                        // Batch acknowledge all messages at once
958                        if !all_stream_ids.is_empty() {
959                            debug!("Acknowledging batch of {} messages", all_stream_ids.len());
960
961                            let mut cmd = redis::cmd("XACK");
962                            cmd.arg(&platform_config.stream_key)
963                                .arg(&platform_config.consumer_group);
964
965                            // Add all stream IDs to the command
966                            for stream_id in &all_stream_ids {
967                                cmd.arg(stream_id);
968                            }
969
970                            match cmd.query_async::<_, i64>(&mut conn).await {
971                                Ok(ack_count) => {
972                                    debug!("Successfully acknowledged {ack_count} messages");
973                                    if ack_count as usize != all_stream_ids.len() {
974                                        warn!(
975                                            "Acknowledged {} messages but expected {}",
976                                            ack_count,
977                                            all_stream_ids.len()
978                                        );
979                                    }
980                                }
981                                Err(e) => {
982                                    error!("Failed to acknowledge message batch: {e}");
983
984                                    // Fallback: Try acknowledging messages individually
985                                    warn!("Falling back to individual acknowledgments");
986                                    for stream_id in &all_stream_ids {
987                                        match redis::cmd("XACK")
988                                            .arg(&platform_config.stream_key)
989                                            .arg(&platform_config.consumer_group)
990                                            .arg(stream_id)
991                                            .query_async::<_, i64>(&mut conn)
992                                            .await
993                                        {
994                                            Ok(_) => {
995                                                debug!(
996                                                    "Individually acknowledged message {stream_id}"
997                                                );
998                                            }
999                                            Err(e) => {
1000                                                error!(
1001                                                    "Failed to individually acknowledge message {stream_id}: {e}"
1002                                                );
1003                                            }
1004                                        }
1005                                    }
1006                                }
1007                            }
1008                        }
1009                    }
1010                    Err(e) => {
1011                        // Check if it's a connection error
1012                        if is_connection_error(&e) {
1013                            error!("Redis connection lost: {e}");
1014                            reporter.set_status(
1015                                ComponentStatus::Running,
1016                                Some(format!("Redis connection lost: {e}")),
1017                            ).await;
1018
1019                            // Try to reconnect
1020                            match Self::connect_with_retry(
1021                                &platform_config.redis_url,
1022                                platform_config.max_retries,
1023                                platform_config.retry_delay_ms,
1024                            )
1025                            .await
1026                            {
1027                                Ok(new_conn) => {
1028                                    conn = new_conn;
1029                                    info!("Reconnected to Redis");
1030                                }
1031                                Err(e) => {
1032                                    error!("Failed to reconnect to Redis: {e}");
1033                                    reporter.set_status(ComponentStatus::Stopped, None).await;
1034                                    return;
1035                                }
1036                            }
1037                        } else if !e.to_string().contains("timeout") {
1038                            // Log non-timeout errors
1039                            error!("Error reading from stream: {e}");
1040                        }
1041                    }
1042                }
1043            }
1044        }.instrument(span))
1045    }
1046}
1047
1048#[async_trait::async_trait]
1049impl Source for PlatformSource {
1050    fn id(&self) -> &str {
1051        &self.base.id
1052    }
1053
1054    fn type_name(&self) -> &str {
1055        "platform"
1056    }
1057
1058    fn properties(&self) -> HashMap<String, serde_json::Value> {
1059        use crate::descriptor::PlatformSourceConfigDto;
1060        use drasi_plugin_sdk::ConfigValue;
1061
1062        let dto = PlatformSourceConfigDto {
1063            redis_url: ConfigValue::Static(self.config.redis_url.clone()),
1064            stream_key: ConfigValue::Static(self.config.stream_key.clone()),
1065            consumer_group: ConfigValue::Static(self.config.consumer_group.clone()),
1066            consumer_name: self
1067                .config
1068                .consumer_name
1069                .as_ref()
1070                .map(|n| ConfigValue::Static(n.clone())),
1071            batch_size: ConfigValue::Static(self.config.batch_size),
1072            block_ms: ConfigValue::Static(self.config.block_ms),
1073        };
1074
1075        match serde_json::to_value(&dto) {
1076            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
1077            _ => HashMap::new(),
1078        }
1079    }
1080
1081    fn auto_start(&self) -> bool {
1082        self.base.get_auto_start()
1083    }
1084
1085    async fn start(&self) -> Result<()> {
1086        info!("Starting platform source: {}", self.base.id);
1087
1088        // Extract configuration from typed config
1089        let platform_config = PlatformConfig {
1090            redis_url: self.config.redis_url.clone(),
1091            stream_key: self.config.stream_key.clone(),
1092            consumer_group: self.config.consumer_group.clone(),
1093            consumer_name: self
1094                .config
1095                .consumer_name
1096                .clone()
1097                .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1098            batch_size: self.config.batch_size,
1099            block_ms: self.config.block_ms,
1100            start_id: ">".to_string(),
1101            always_create_consumer_group: false,
1102            max_retries: 5,
1103            retry_delay_ms: 1000,
1104        };
1105
1106        // Update status
1107        self.base
1108            .set_status(
1109                ComponentStatus::Running,
1110                Some("Platform source running".to_string()),
1111            )
1112            .await;
1113
1114        // Get instance_id from context for log routing isolation
1115        let instance_id = self
1116            .base
1117            .context()
1118            .await
1119            .map(|c| c.instance_id)
1120            .unwrap_or_default();
1121
1122        // Start consumer task
1123        let task = Self::start_consumer_task(
1124            self.base.id.clone(),
1125            instance_id,
1126            platform_config,
1127            self.base.dispatchers.clone(),
1128            self.base.status_handle(),
1129        )
1130        .await;
1131
1132        *self.base.task_handle.write().await = Some(task);
1133
1134        Ok(())
1135    }
1136
1137    async fn stop(&self) -> Result<()> {
1138        info!("Stopping platform source: {}", self.base.id);
1139
1140        // Cancel the task
1141        if let Some(handle) = self.base.task_handle.write().await.take() {
1142            handle.abort();
1143        }
1144
1145        // Update status
1146        self.base
1147            .set_status(
1148                ComponentStatus::Stopped,
1149                Some("Platform source stopped".to_string()),
1150            )
1151            .await;
1152
1153        Ok(())
1154    }
1155
1156    async fn status(&self) -> ComponentStatus {
1157        self.base.get_status().await
1158    }
1159
1160    async fn subscribe(
1161        &self,
1162        settings: drasi_lib::config::SourceSubscriptionSettings,
1163    ) -> Result<SubscriptionResponse> {
1164        self.base
1165            .subscribe_with_bootstrap(&settings, "Platform")
1166            .await
1167    }
1168
1169    fn as_any(&self) -> &dyn std::any::Any {
1170        self
1171    }
1172
1173    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1174        self.base.initialize(context).await;
1175    }
1176
1177    async fn set_bootstrap_provider(
1178        &self,
1179        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1180    ) {
1181        self.base.set_bootstrap_provider(provider).await;
1182    }
1183}
1184
1185impl PlatformSource {
1186    /// Create a test subscription to this source (synchronous)
1187    ///
1188    /// This method delegates to SourceBase and is provided for convenience in tests.
1189    /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1190    pub fn test_subscribe(
1191        &self,
1192    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1193        self.base.test_subscribe()
1194    }
1195
1196    /// Create a test subscription to this source (async)
1197    ///
1198    /// This method delegates to SourceBase and is provided for convenience in async tests.
1199    /// Prefer this method over test_subscribe() in async contexts.
1200    pub async fn test_subscribe_async(
1201        &self,
1202    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1203        self.base
1204            .create_streaming_receiver()
1205            .await
1206            .expect("Failed to create test subscription")
1207    }
1208}
1209
1210/// Extract event data from Redis stream entry
1211fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1212    // Look for common field names
1213    for key in &["data", "event", "payload", "message"] {
1214        if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1215            return String::from_utf8(data.clone())
1216                .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1217        }
1218    }
1219
1220    Err(anyhow::anyhow!(
1221        "No event data found in stream entry. Available keys: {:?}",
1222        entry_map.keys().collect::<Vec<_>>()
1223    ))
1224}
1225
1226/// Check if error is a connection error
1227fn is_connection_error(e: &redis::RedisError) -> bool {
1228    e.is_connection_dropped()
1229        || e.is_io_error()
1230        || e.to_string().contains("connection")
1231        || e.to_string().contains("EOF")
1232}
1233
1234/// Message type discriminator
1235#[derive(Debug, Clone, PartialEq)]
1236enum MessageType {
1237    /// Control message with control type from source.table
1238    Control(String),
1239    /// Data message (node/relation change)
1240    Data,
1241}
1242
1243/// Detect message type based on payload.source.db field
1244///
1245/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1246/// Returns MessageType::Data for all other cases
1247fn detect_message_type(cloud_event: &Value) -> MessageType {
1248    // Extract data array and get first event to check message type
1249    let data_array = match cloud_event["data"].as_array() {
1250        Some(arr) if !arr.is_empty() => arr,
1251        _ => return MessageType::Data, // Default to data if no data array
1252    };
1253
1254    // Check the first event's source.db field
1255    let first_event = &data_array[0];
1256    let source_db = first_event["payload"]["source"]["db"]
1257        .as_str()
1258        .unwrap_or("");
1259
1260    // Case-insensitive comparison with "Drasi"
1261    if source_db.eq_ignore_ascii_case("drasi") {
1262        // Extract source.table to determine control type
1263        let control_type = first_event["payload"]["source"]["table"]
1264            .as_str()
1265            .unwrap_or("Unknown")
1266            .to_string();
1267        MessageType::Control(control_type)
1268    } else {
1269        MessageType::Data
1270    }
1271}
1272
1273/// Helper struct to hold SourceChange along with reactivator timestamps
1274#[derive(Debug)]
1275struct SourceChangeWithTimestamps {
1276    source_change: SourceChange,
1277    reactivator_start_ns: Option<u64>,
1278    reactivator_end_ns: Option<u64>,
1279}
1280
1281/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1282///
1283/// Handles events in CloudEvent format with a data array containing change events.
1284/// Each event in the data array has: op, payload.after/before, payload.source
1285fn transform_platform_event(
1286    cloud_event: Value,
1287    source_id: &str,
1288) -> Result<Vec<SourceChangeWithTimestamps>> {
1289    let mut source_changes = Vec::new();
1290
1291    // Extract the data array from CloudEvent wrapper
1292    let data_array = cloud_event["data"]
1293        .as_array()
1294        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1295
1296    // Process each event in the data array
1297    for event in data_array {
1298        // Extract reactivator timestamps from top-level event fields
1299        let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1300        let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1301
1302        // Extract operation type (op field: "i", "u", "d")
1303        let op = event["op"]
1304            .as_str()
1305            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1306
1307        // Extract payload
1308        let payload = &event["payload"];
1309        if payload.is_null() {
1310            return Err(anyhow::anyhow!("Missing 'payload' field"));
1311        }
1312
1313        // Extract element type from payload.source.table
1314        let element_type = payload["source"]["table"]
1315            .as_str()
1316            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1317
1318        // Get element data based on operation
1319        let element_data = match op {
1320            "i" | "u" => &payload["after"],
1321            "d" => &payload["before"],
1322            _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1323        };
1324
1325        if element_data.is_null() {
1326            return Err(anyhow::anyhow!(
1327                "Missing element data (after/before) for operation {op}"
1328            ));
1329        }
1330
1331        // Extract element ID
1332        let element_id = element_data["id"]
1333            .as_str()
1334            .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1335
1336        // Extract labels
1337        let labels_array = element_data["labels"]
1338            .as_array()
1339            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1340
1341        let labels: Vec<Arc<str>> = labels_array
1342            .iter()
1343            .filter_map(|v| v.as_str().map(Arc::from))
1344            .collect();
1345
1346        if labels.is_empty() {
1347            return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1348        }
1349
1350        // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1351        let ts_ns = payload["source"]["ts_ns"]
1352            .as_u64()
1353            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1354        let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1355
1356        // Build ElementMetadata
1357        let reference = ElementReference::new(source_id, element_id);
1358        let metadata = ElementMetadata {
1359            reference,
1360            labels: labels.into(),
1361            effective_from,
1362        };
1363
1364        // Handle delete operation (no properties needed)
1365        if op == "d" {
1366            source_changes.push(SourceChangeWithTimestamps {
1367                source_change: SourceChange::Delete { metadata },
1368                reactivator_start_ns,
1369                reactivator_end_ns,
1370            });
1371            continue;
1372        }
1373
1374        // Convert properties
1375        let properties_obj = element_data["properties"]
1376            .as_object()
1377            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1378
1379        let properties = convert_json_to_element_properties(properties_obj);
1380
1381        // Build element based on type
1382        let element = match element_type {
1383            "node" => Element::Node {
1384                metadata,
1385                properties,
1386            },
1387            "rel" | "relation" => {
1388                // Extract startId and endId
1389                let start_id = element_data["startId"]
1390                    .as_str()
1391                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1392                let end_id = element_data["endId"]
1393                    .as_str()
1394                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1395
1396                Element::Relation {
1397                    metadata,
1398                    properties,
1399                    in_node: ElementReference::new(source_id, start_id),
1400                    out_node: ElementReference::new(source_id, end_id),
1401                }
1402            }
1403            _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1404        };
1405
1406        // Build SourceChange
1407        let source_change = match op {
1408            "i" => SourceChange::Insert { element },
1409            "u" => SourceChange::Update { element },
1410            _ => unreachable!(),
1411        };
1412
1413        source_changes.push(SourceChangeWithTimestamps {
1414            source_change,
1415            reactivator_start_ns,
1416            reactivator_end_ns,
1417        });
1418    }
1419
1420    Ok(source_changes)
1421}
1422
1423/// Transform CloudEvent-wrapped control event to SourceControl(s)
1424///
1425/// Handles control messages from Query API service with source.db = "Drasi"
1426/// Currently supports "SourceSubscription" control type
1427fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1428    let mut control_events = Vec::new();
1429
1430    // Extract the data array from CloudEvent wrapper
1431    let data_array = cloud_event["data"]
1432        .as_array()
1433        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1434
1435    // Check if control type is supported
1436    if control_type != "SourceSubscription" {
1437        info!(
1438            "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1439        );
1440        return Ok(control_events); // Return empty vec for unknown types
1441    }
1442
1443    // Process each event in the data array
1444    for event in data_array {
1445        // Extract operation type (op field: "i", "u", "d")
1446        let op = event["op"]
1447            .as_str()
1448            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1449
1450        // Extract payload
1451        let payload = &event["payload"];
1452        if payload.is_null() {
1453            warn!("Missing 'payload' field in control event, skipping");
1454            continue;
1455        }
1456
1457        // Get data based on operation
1458        let control_data = match op {
1459            "i" | "u" => &payload["after"],
1460            "d" => &payload["before"],
1461            _ => {
1462                warn!("Unknown operation type in control event: {op}, skipping");
1463                continue;
1464            }
1465        };
1466
1467        if control_data.is_null() {
1468            warn!("Missing control data (after/before) for operation {op}, skipping");
1469            continue;
1470        }
1471
1472        // Extract required fields for SourceSubscription
1473        let query_id = match control_data["queryId"].as_str() {
1474            Some(id) => id.to_string(),
1475            None => {
1476                warn!("Missing required 'queryId' field in control event, skipping");
1477                continue;
1478            }
1479        };
1480
1481        let query_node_id = match control_data["queryNodeId"].as_str() {
1482            Some(id) => id.to_string(),
1483            None => {
1484                warn!("Missing required 'queryNodeId' field in control event, skipping");
1485                continue;
1486            }
1487        };
1488
1489        // Extract optional label arrays (default to empty if missing)
1490        let node_labels = control_data["nodeLabels"]
1491            .as_array()
1492            .map(|arr| {
1493                arr.iter()
1494                    .filter_map(|v| v.as_str().map(String::from))
1495                    .collect()
1496            })
1497            .unwrap_or_default();
1498
1499        let rel_labels = control_data["relLabels"]
1500            .as_array()
1501            .map(|arr| {
1502                arr.iter()
1503                    .filter_map(|v| v.as_str().map(String::from))
1504                    .collect()
1505            })
1506            .unwrap_or_default();
1507
1508        // Map operation to ControlOperation
1509        let operation = match op {
1510            "i" => ControlOperation::Insert,
1511            "u" => ControlOperation::Update,
1512            "d" => ControlOperation::Delete,
1513            _ => unreachable!(), // Already filtered above
1514        };
1515
1516        // Build SourceControl::Subscription
1517        let control_event = SourceControl::Subscription {
1518            query_id,
1519            query_node_id,
1520            node_labels,
1521            rel_labels,
1522            operation,
1523        };
1524
1525        control_events.push(control_event);
1526    }
1527
1528    Ok(control_events)
1529}
1530
1531/// Dynamic plugin entry point.
1532///
1533/// Dynamic plugin entry point.
1534#[cfg(feature = "dynamic-plugin")]
1535drasi_plugin_sdk::export_plugin!(
1536    plugin_id = "platform-source",
1537    core_version = env!("CARGO_PKG_VERSION"),
1538    lib_version = env!("CARGO_PKG_VERSION"),
1539    plugin_version = env!("CARGO_PKG_VERSION"),
1540    source_descriptors = [descriptor::PlatformSourceDescriptor],
1541    reaction_descriptors = [],
1542    bootstrap_descriptors = [],
1543);