Skip to main content

drasi_source_platform/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Platform Source Plugin for Drasi
17//!
18//! This plugin consumes data change events from Redis Streams, which is the primary
19//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
20//! containing node and relation changes, as well as control events for query subscriptions.
21//!
22//! # Architecture
23//!
24//! The platform source connects to Redis as a consumer group member, enabling:
25//! - **At-least-once delivery**: Messages are acknowledged after processing
26//! - **Horizontal scaling**: Multiple consumers can share the workload
27//! - **Fault tolerance**: Unacknowledged messages are redelivered
28//!
29//! # Configuration
30//!
31//! | Field | Type | Default | Description |
32//! |-------|------|---------|-------------|
33//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
34//! | `stream_key` | string | *required* | Redis stream key to consume from |
35//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
36//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
37//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
38//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
39//!
40//! # Data Format
41//!
42//! The platform source expects CloudEvent-wrapped messages with a `data` array
43//! containing change events. Each event includes an operation type and payload.
44//!
45//! ## Node Insert
46//!
47//! ```json
48//! {
49//!     "data": [{
50//!         "op": "i",
51//!         "payload": {
52//!             "after": {
53//!                 "id": "user-123",
54//!                 "labels": ["User"],
55//!                 "properties": {
56//!                     "name": "Alice",
57//!                     "email": "alice@example.com"
58//!                 }
59//!             },
60//!             "source": {
61//!                 "db": "mydb",
62//!                 "table": "node",
63//!                 "ts_ns": 1699900000000000000
64//!             }
65//!         }
66//!     }]
67//! }
68//! ```
69//!
70//! ## Node Update
71//!
72//! ```json
73//! {
74//!     "data": [{
75//!         "op": "u",
76//!         "payload": {
77//!             "after": {
78//!                 "id": "user-123",
79//!                 "labels": ["User"],
80//!                 "properties": { "name": "Alice Updated" }
81//!             },
82//!             "source": { "table": "node", "ts_ns": 1699900001000000000 }
83//!         }
84//!     }]
85//! }
86//! ```
87//!
88//! ## Node Delete
89//!
90//! ```json
91//! {
92//!     "data": [{
93//!         "op": "d",
94//!         "payload": {
95//!             "before": {
96//!                 "id": "user-123",
97//!                 "labels": ["User"],
98//!                 "properties": {}
99//!             },
100//!             "source": { "table": "node", "ts_ns": 1699900002000000000 }
101//!         }
102//!     }]
103//! }
104//! ```
105//!
106//! ## Relation Insert
107//!
108//! ```json
109//! {
110//!     "data": [{
111//!         "op": "i",
112//!         "payload": {
113//!             "after": {
114//!                 "id": "follows-1",
115//!                 "labels": ["FOLLOWS"],
116//!                 "startId": "user-123",
117//!                 "endId": "user-456",
118//!                 "properties": { "since": "2024-01-01" }
119//!             },
120//!             "source": { "table": "rel", "ts_ns": 1699900003000000000 }
121//!         }
122//!     }]
123//! }
124//! ```
125//!
126//! # Control Events
127//!
128//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
129//! Currently supported control types:
130//!
131//! - **SourceSubscription**: Query subscription management
132//!
133//! # Example Configuration (YAML)
134//!
135//! ```yaml
136//! source_type: platform
137//! properties:
138//!   redis_url: "redis://localhost:6379"
139//!   stream_key: "my-app-changes"
140//!   consumer_group: "drasi-consumers"
141//!   batch_size: 50
142//!   block_ms: 10000
143//! ```
144//!
145//! # Usage Example
146//!
147//! ```rust,ignore
148//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
149//! use std::sync::Arc;
150//!
151//! let config = PlatformSourceBuilder::new()
152//!     .with_redis_url("redis://localhost:6379")
153//!     .with_stream_key("my-changes")
154//!     .with_consumer_group("my-consumers")
155//!     .build();
156//!
157//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
158//! drasi.add_source(source).await?;
159//! ```
160
161pub mod config;
162pub mod descriptor;
163pub use config::PlatformSourceConfig;
164
165use anyhow::Result;
166use log::{debug, error, info, warn};
167use redis::streams::StreamReadReply;
168use serde_json::Value;
169use std::collections::HashMap;
170use std::sync::Arc;
171use std::time::Duration;
172use tokio::sync::RwLock;
173use tokio::task::JoinHandle;
174
175use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
176use drasi_lib::channels::{
177    ComponentStatus, ControlOperation, DispatchMode, SourceControl, SourceEvent,
178    SourceEventWrapper, SubscriptionResponse,
179};
180use drasi_lib::component_graph::ComponentStatusHandle;
181use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
182use drasi_lib::sources::manager::convert_json_to_element_properties;
183use drasi_lib::Source;
184use tracing::Instrument;
185
186#[cfg(test)]
187mod tests;
188
189/// Configuration for the platform source
190#[derive(Debug, Clone)]
191struct PlatformConfig {
192    /// Redis connection URL
193    redis_url: String,
194    /// Redis stream key to read from
195    stream_key: String,
196    /// Consumer group name
197    consumer_group: String,
198    /// Consumer name (should be unique per instance)
199    consumer_name: String,
200    /// Number of events to read per XREADGROUP call
201    batch_size: usize,
202    /// Milliseconds to block waiting for events
203    block_ms: u64,
204    /// Stream position to start from (">" for new, "0" for all)
205    start_id: String,
206    /// Always recreate consumer group on startup (default: false)
207    /// If true, deletes and recreates the consumer group using start_id
208    /// If false, uses existing group position (ignores start_id if group exists)
209    always_create_consumer_group: bool,
210    /// Maximum connection retry attempts
211    max_retries: usize,
212    /// Delay between retries in milliseconds
213    retry_delay_ms: u64,
214}
215
216impl Default for PlatformConfig {
217    fn default() -> Self {
218        Self {
219            redis_url: String::new(),
220            stream_key: String::new(),
221            consumer_group: String::new(),
222            consumer_name: String::new(),
223            batch_size: 10,
224            block_ms: 5000,
225            start_id: ">".to_string(),
226            always_create_consumer_group: false,
227            max_retries: 3,
228            retry_delay_ms: 1000,
229        }
230    }
231}
232
233/// Platform source that reads events from Redis Streams.
234///
235/// This source connects to a Redis instance and consumes CloudEvent-wrapped
236/// messages from a stream using consumer groups. It supports both data events
237/// (node/relation changes) and control events (query subscriptions).
238///
239/// # Fields
240///
241/// - `base`: Common source functionality (dispatchers, status, lifecycle)
242/// - `config`: Platform-specific configuration (Redis connection, stream settings)
243pub struct PlatformSource {
244    /// Base source implementation providing common functionality
245    base: SourceBase,
246    /// Platform source configuration
247    config: PlatformSourceConfig,
248}
249
250/// Builder for creating [`PlatformSource`] instances.
251///
252/// Provides a fluent API for constructing platform sources
253/// with sensible defaults.
254///
255/// # Example
256///
257/// ```rust,ignore
258/// use drasi_source_platform::PlatformSource;
259///
260/// let source = PlatformSource::builder("my-platform-source")
261///     .with_redis_url("redis://localhost:6379")
262///     .with_stream_key("my-app-changes")
263///     .with_consumer_group("my-consumers")
264///     .with_batch_size(50)
265///     .build()?;
266/// ```
267pub struct PlatformSourceBuilder {
268    id: String,
269    redis_url: String,
270    stream_key: String,
271    consumer_group: Option<String>,
272    consumer_name: Option<String>,
273    batch_size: Option<usize>,
274    block_ms: Option<u64>,
275    dispatch_mode: Option<DispatchMode>,
276    dispatch_buffer_capacity: Option<usize>,
277    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
278    auto_start: bool,
279}
280
281impl PlatformSourceBuilder {
282    /// Create a new builder with the given ID and default values.
283    pub fn new(id: impl Into<String>) -> Self {
284        Self {
285            id: id.into(),
286            redis_url: String::new(),
287            stream_key: String::new(),
288            consumer_group: None,
289            consumer_name: None,
290            batch_size: None,
291            block_ms: None,
292            dispatch_mode: None,
293            dispatch_buffer_capacity: None,
294            bootstrap_provider: None,
295            auto_start: true,
296        }
297    }
298
299    /// Set the Redis connection URL.
300    ///
301    /// # Arguments
302    ///
303    /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
304    pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
305        self.redis_url = url.into();
306        self
307    }
308
309    /// Set the Redis stream key to consume from.
310    ///
311    /// # Arguments
312    ///
313    /// * `key` - Name of the Redis stream
314    pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
315        self.stream_key = key.into();
316        self
317    }
318
319    /// Set the consumer group name.
320    ///
321    /// # Arguments
322    ///
323    /// * `group` - Consumer group name (default: `"drasi-core"`)
324    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
325        self.consumer_group = Some(group.into());
326        self
327    }
328
329    /// Set a unique consumer name within the group.
330    ///
331    /// # Arguments
332    ///
333    /// * `name` - Unique consumer name (auto-generated if not specified)
334    pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
335        self.consumer_name = Some(name.into());
336        self
337    }
338
339    /// Set the batch size for reading events.
340    ///
341    /// # Arguments
342    ///
343    /// * `size` - Number of events to read per XREADGROUP call (default: 100)
344    pub fn with_batch_size(mut self, size: usize) -> Self {
345        self.batch_size = Some(size);
346        self
347    }
348
349    /// Set the block timeout for waiting on events.
350    ///
351    /// # Arguments
352    ///
353    /// * `ms` - Milliseconds to block waiting for events (default: 5000)
354    pub fn with_block_ms(mut self, ms: u64) -> Self {
355        self.block_ms = Some(ms);
356        self
357    }
358
359    /// Set the dispatch mode for this source
360    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
361        self.dispatch_mode = Some(mode);
362        self
363    }
364
365    /// Set the dispatch buffer capacity for this source
366    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
367        self.dispatch_buffer_capacity = Some(capacity);
368        self
369    }
370
371    /// Set the bootstrap provider for this source
372    pub fn with_bootstrap_provider(
373        mut self,
374        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
375    ) -> Self {
376        self.bootstrap_provider = Some(Box::new(provider));
377        self
378    }
379
380    /// Set whether this source should auto-start when DrasiLib starts.
381    ///
382    /// Default is `true`. Set to `false` if this source should only be
383    /// started manually via `start_source()`.
384    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
385        self.auto_start = auto_start;
386        self
387    }
388
389    /// Set the full configuration at once
390    pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
391        self.redis_url = config.redis_url;
392        self.stream_key = config.stream_key;
393        self.consumer_group = Some(config.consumer_group);
394        self.consumer_name = config.consumer_name;
395        self.batch_size = Some(config.batch_size);
396        self.block_ms = Some(config.block_ms);
397        self
398    }
399
400    /// Build the platform source.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the source cannot be constructed.
405    pub fn build(self) -> Result<PlatformSource> {
406        let config = PlatformSourceConfig {
407            redis_url: self.redis_url,
408            stream_key: self.stream_key,
409            consumer_group: self
410                .consumer_group
411                .unwrap_or_else(|| "drasi-core".to_string()),
412            consumer_name: self.consumer_name,
413            batch_size: self.batch_size.unwrap_or(100),
414            block_ms: self.block_ms.unwrap_or(5000),
415        };
416
417        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
418        if let Some(mode) = self.dispatch_mode {
419            params = params.with_dispatch_mode(mode);
420        }
421        if let Some(capacity) = self.dispatch_buffer_capacity {
422            params = params.with_dispatch_buffer_capacity(capacity);
423        }
424        if let Some(provider) = self.bootstrap_provider {
425            params = params.with_bootstrap_provider(provider);
426        }
427
428        Ok(PlatformSource {
429            base: SourceBase::new(params)?,
430            config,
431        })
432    }
433}
434
435impl PlatformSource {
436    /// Create a builder for PlatformSource
437    ///
438    /// # Example
439    ///
440    /// ```rust,ignore
441    /// use drasi_source_platform::PlatformSource;
442    ///
443    /// let source = PlatformSource::builder("my-platform-source")
444    ///     .with_redis_url("redis://localhost:6379")
445    ///     .with_stream_key("my-changes")
446    ///     .with_bootstrap_provider(my_provider)
447    ///     .build()?;
448    /// ```
449    pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
450        PlatformSourceBuilder::new(id)
451    }
452
453    /// Create a new platform source.
454    ///
455    /// The event channel is automatically injected when the source is added
456    /// to DrasiLib via `add_source()`.
457    ///
458    /// # Arguments
459    ///
460    /// * `id` - Unique identifier for this source instance
461    /// * `config` - Platform source configuration
462    ///
463    /// # Returns
464    ///
465    /// A new `PlatformSource` instance, or an error if construction fails.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the base source cannot be initialized.
470    ///
471    /// # Example
472    ///
473    /// ```rust,ignore
474    /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
475    ///
476    /// let config = PlatformSourceBuilder::new("my-platform-source")
477    ///     .with_redis_url("redis://localhost:6379")
478    ///     .with_stream_key("my-changes")
479    ///     .build()?;
480    /// ```
481    pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
482        let id = id.into();
483        let params = SourceBaseParams::new(id);
484        Ok(Self {
485            base: SourceBase::new(params)?,
486            config,
487        })
488    }
489
490    /// Subscribe to source events (for testing)
491    ///
492    /// This method is intended for use in tests to receive events broadcast by the source.
493    /// In production, queries subscribe to sources through the SourceManager.
494    /// Parse configuration from properties
495    #[allow(dead_code)]
496    fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
497        // Extract required fields first
498        let redis_url = properties
499            .get("redis_url")
500            .and_then(|v| v.as_str())
501            .ok_or_else(|| {
502                anyhow::anyhow!(
503                    "Configuration error: Missing required field 'redis_url'. \
504                 Platform source requires a Redis connection URL"
505                )
506            })?
507            .to_string();
508
509        let stream_key = properties
510            .get("stream_key")
511            .and_then(|v| v.as_str())
512            .ok_or_else(|| {
513                anyhow::anyhow!(
514                    "Configuration error: Missing required field 'stream_key'. \
515                 Platform source requires a Redis Stream key to read from"
516                )
517            })?
518            .to_string();
519
520        let consumer_group = properties
521            .get("consumer_group")
522            .and_then(|v| v.as_str())
523            .ok_or_else(|| {
524                anyhow::anyhow!(
525                    "Configuration error: Missing required field 'consumer_group'. \
526                 Platform source requires a consumer group name"
527                )
528            })?
529            .to_string();
530
531        let consumer_name = properties
532            .get("consumer_name")
533            .and_then(|v| v.as_str())
534            .ok_or_else(|| {
535                anyhow::anyhow!(
536                    "Configuration error: Missing required field 'consumer_name'. \
537                 Platform source requires a unique consumer name"
538                )
539            })?
540            .to_string();
541
542        // Get defaults for optional field handling
543        let defaults = PlatformConfig::default();
544
545        // Build config with optional fields
546        let config = PlatformConfig {
547            redis_url,
548            stream_key,
549            consumer_group,
550            consumer_name,
551            batch_size: properties
552                .get("batch_size")
553                .and_then(|v| v.as_u64())
554                .map(|v| v as usize)
555                .unwrap_or(defaults.batch_size),
556            block_ms: properties
557                .get("block_ms")
558                .and_then(|v| v.as_u64())
559                .unwrap_or(defaults.block_ms),
560            start_id: properties
561                .get("start_id")
562                .and_then(|v| v.as_str())
563                .map(|s| s.to_string())
564                .unwrap_or(defaults.start_id),
565            always_create_consumer_group: properties
566                .get("always_create_consumer_group")
567                .and_then(|v| v.as_bool())
568                .unwrap_or(defaults.always_create_consumer_group),
569            max_retries: properties
570                .get("max_retries")
571                .and_then(|v| v.as_u64())
572                .map(|v| v as usize)
573                .unwrap_or(defaults.max_retries),
574            retry_delay_ms: properties
575                .get("retry_delay_ms")
576                .and_then(|v| v.as_u64())
577                .unwrap_or(defaults.retry_delay_ms),
578        };
579
580        // Validate
581        if config.redis_url.is_empty() {
582            return Err(anyhow::anyhow!(
583                "Validation error: redis_url cannot be empty. \
584                 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
585            ));
586        }
587        if config.stream_key.is_empty() {
588            return Err(anyhow::anyhow!(
589                "Validation error: stream_key cannot be empty. \
590                 Please specify the Redis Stream key to read from"
591            ));
592        }
593        if config.consumer_group.is_empty() {
594            return Err(anyhow::anyhow!(
595                "Validation error: consumer_group cannot be empty. \
596                 Please specify a consumer group name for this source"
597            ));
598        }
599        if config.consumer_name.is_empty() {
600            return Err(anyhow::anyhow!(
601                "Validation error: consumer_name cannot be empty. \
602                 Please specify a unique consumer name within the consumer group"
603            ));
604        }
605
606        Ok(config)
607    }
608
609    /// Connect to Redis with retry logic
610    async fn connect_with_retry(
611        redis_url: &str,
612        max_retries: usize,
613        retry_delay_ms: u64,
614    ) -> Result<redis::aio::MultiplexedConnection> {
615        let client = redis::Client::open(redis_url)?;
616        let mut delay = retry_delay_ms;
617
618        for attempt in 0..max_retries {
619            match client.get_multiplexed_async_connection().await {
620                Ok(conn) => {
621                    info!("Successfully connected to Redis");
622                    return Ok(conn);
623                }
624                Err(e) if attempt < max_retries - 1 => {
625                    warn!(
626                        "Redis connection failed (attempt {}/{}): {}",
627                        attempt + 1,
628                        max_retries,
629                        e
630                    );
631                    tokio::time::sleep(Duration::from_millis(delay)).await;
632                    delay *= 2; // Exponential backoff
633                }
634                Err(e) => {
635                    return Err(anyhow::anyhow!(
636                        "Failed to connect to Redis after {max_retries} attempts: {e}"
637                    ));
638                }
639            }
640        }
641
642        unreachable!()
643    }
644
645    /// Create or recreate consumer group based on configuration
646    async fn create_consumer_group(
647        conn: &mut redis::aio::MultiplexedConnection,
648        stream_key: &str,
649        consumer_group: &str,
650        start_id: &str,
651        always_create: bool,
652    ) -> Result<()> {
653        // Determine the initial position for the consumer group
654        let group_start_id = if start_id == ">" {
655            "$" // ">" means only new messages, so create group at end
656        } else {
657            start_id // "0" or specific ID
658        };
659
660        // If always_create is true, delete the existing group first
661        if always_create {
662            info!(
663                "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
664            );
665
666            let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
667                .arg("DESTROY")
668                .arg(stream_key)
669                .arg(consumer_group)
670                .query_async(conn)
671                .await;
672
673            match destroy_result {
674                Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
675                Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
676                Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
677                Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
678            }
679        }
680
681        // Try to create the consumer group
682        let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
683            .arg("CREATE")
684            .arg(stream_key)
685            .arg(consumer_group)
686            .arg(group_start_id)
687            .arg("MKSTREAM")
688            .query_async(conn)
689            .await;
690
691        match result {
692            Ok(_) => {
693                info!(
694                    "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
695                );
696                Ok(())
697            }
698            Err(e) => {
699                // BUSYGROUP error means the group already exists
700                if e.to_string().contains("BUSYGROUP") {
701                    info!(
702                        "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
703                    );
704                    Ok(())
705                } else {
706                    Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
707                }
708            }
709        }
710    }
711
712    /// Start the stream consumer task
713    async fn start_consumer_task(
714        source_id: String,
715        instance_id: String,
716        platform_config: PlatformConfig,
717        dispatchers: Arc<
718            RwLock<
719                Vec<
720                    Box<
721                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
722                    >,
723                >,
724            >,
725        >,
726        reporter: ComponentStatusHandle,
727    ) -> JoinHandle<()> {
728        let source_id_for_span = source_id.clone();
729        let span = tracing::info_span!(
730            "platform_source_consumer",
731            instance_id = %instance_id,
732            component_id = %source_id_for_span,
733            component_type = "source"
734        );
735        tokio::spawn(async move {
736            info!(
737                "Starting platform source consumer for source '{}' on stream '{}'",
738                source_id, platform_config.stream_key
739            );
740
741            // Connect to Redis
742            let mut conn = match Self::connect_with_retry(
743                &platform_config.redis_url,
744                platform_config.max_retries,
745                platform_config.retry_delay_ms,
746            )
747            .await
748            {
749                Ok(conn) => conn,
750                Err(e) => {
751                    error!("Failed to connect to Redis: {e}");
752                    reporter.set_status(
753                        ComponentStatus::Stopped,
754                        Some(format!("Failed to connect to Redis: {e}")),
755                    ).await;
756                    return;
757                }
758            };
759
760            // Create consumer group
761            if let Err(e) = Self::create_consumer_group(
762                &mut conn,
763                &platform_config.stream_key,
764                &platform_config.consumer_group,
765                &platform_config.start_id,
766                platform_config.always_create_consumer_group,
767            )
768            .await
769            {
770                error!("Failed to create consumer group: {e}");
771                reporter.set_status(
772                    ComponentStatus::Stopped,
773                    Some(format!("Failed to create consumer group: {e}")),
774                ).await;
775                return;
776            }
777
778            // Main consumer loop
779            loop {
780                // Read from stream using ">" to get next undelivered messages for this consumer group
781                let read_result: Result<StreamReadReply, redis::RedisError> =
782                    redis::cmd("XREADGROUP")
783                        .arg("GROUP")
784                        .arg(&platform_config.consumer_group)
785                        .arg(&platform_config.consumer_name)
786                        .arg("COUNT")
787                        .arg(platform_config.batch_size)
788                        .arg("BLOCK")
789                        .arg(platform_config.block_ms)
790                        .arg("STREAMS")
791                        .arg(&platform_config.stream_key)
792                        .arg(">") // Always use ">" for consumer group reads
793                        .query_async(&mut conn)
794                        .await;
795
796                match read_result {
797                    Ok(reply) => {
798                        // Collect all stream IDs for batch acknowledgment
799                        let mut all_stream_ids = Vec::new();
800
801                        // Process each stream entry
802                        for stream_key in reply.keys {
803                            for stream_id in stream_key.ids {
804                                debug!("Received event from stream: {}", stream_id.id);
805
806                                // Store stream ID for batch acknowledgment
807                                all_stream_ids.push(stream_id.id.clone());
808
809                                // Extract event data
810                                match extract_event_data(&stream_id.map) {
811                                    Ok(event_json) => {
812                                        // Parse JSON
813                                        match serde_json::from_str::<Value>(&event_json) {
814                                            Ok(cloud_event) => {
815                                                // Detect message type
816                                                let message_type =
817                                                    detect_message_type(&cloud_event);
818
819                                                match message_type {
820                                                    MessageType::Control(control_type) => {
821                                                        // Handle control message
822                                                        debug!(
823                                                            "Detected control message of type: {control_type}"
824                                                        );
825
826                                                        match transform_control_event(
827                                                            cloud_event,
828                                                            &control_type,
829                                                        ) {
830                                                            Ok(control_events) => {
831                                                                // Publish control events
832                                                                for control_event in control_events
833                                                                {
834                                                                    // Create profiling metadata with timestamps
835                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
836                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
837
838                                                                    let wrapper = SourceEventWrapper::with_profiling(
839                                                                        source_id.clone(),
840                                                                        SourceEvent::Control(control_event),
841                                                                        chrono::Utc::now(),
842                                                                        profiling,
843                                                                    );
844
845                                                                    // Dispatch via helper
846                                                                    if let Err(e) = SourceBase::dispatch_from_task(
847                                                                        dispatchers.clone(),
848                                                                        wrapper,
849                                                                        &source_id,
850                                                                    )
851                                                                    .await
852                                                                    {
853                                                                        debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
854                                                                    } else {
855                                                                        debug!(
856                                                                            "Published control event for stream {}",
857                                                                            stream_id.id
858                                                                        );
859                                                                    }
860                                                                }
861                                                            }
862                                                            Err(e) => {
863                                                                warn!(
864                                                                    "Failed to transform control event {}: {}",
865                                                                    stream_id.id, e
866                                                                );
867                                                            }
868                                                        }
869                                                    }
870                                                    MessageType::Data => {
871                                                        // Handle data message
872                                                        match transform_platform_event(
873                                                            cloud_event,
874                                                            &source_id,
875                                                        ) {
876                                                            Ok(source_changes_with_timestamps) => {
877                                                                // Publish source changes
878                                                                for item in
879                                                                    source_changes_with_timestamps
880                                                                {
881                                                                    // Create profiling metadata with timestamps
882                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
883                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
884
885                                                                    // Extract source_ns from SourceChange transaction time
886                                                                    profiling.source_ns = Some(
887                                                                        item.source_change
888                                                                            .get_transaction_time(),
889                                                                    );
890
891                                                                    // Set reactivator timestamps from event
892                                                                    profiling
893                                                                        .reactivator_start_ns =
894                                                                        item.reactivator_start_ns;
895                                                                    profiling.reactivator_end_ns =
896                                                                        item.reactivator_end_ns;
897
898                                                                    let wrapper = SourceEventWrapper::with_profiling(
899                                                                        source_id.clone(),
900                                                                        SourceEvent::Change(item.source_change),
901                                                                        chrono::Utc::now(),
902                                                                        profiling,
903                                                                    );
904
905                                                                    // Dispatch via helper
906                                                                    if let Err(e) = SourceBase::dispatch_from_task(
907                                                                        dispatchers.clone(),
908                                                                        wrapper,
909                                                                        &source_id,
910                                                                    )
911                                                                    .await
912                                                                    {
913                                                                        debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
914                                                                    } else {
915                                                                        debug!(
916                                                                            "Published source change for event {}",
917                                                                            stream_id.id
918                                                                        );
919                                                                    }
920                                                                }
921                                                            }
922                                                            Err(e) => {
923                                                                warn!(
924                                                                    "Failed to transform event {}: {}",
925                                                                    stream_id.id, e
926                                                                );
927                                                                reporter.set_status(
928                                                                    ComponentStatus::Running,
929                                                                    Some(format!(
930                                                                        "Transformation error: {e}"
931                                                                    )),
932                                                                ).await;
933                                                            }
934                                                        }
935                                                    }
936                                                }
937                                            }
938                                            Err(e) => {
939                                                warn!(
940                                                    "Failed to parse JSON for event {}: {}",
941                                                    stream_id.id, e
942                                                );
943                                            }
944                                        }
945                                    }
946                                    Err(e) => {
947                                        warn!(
948                                            "Failed to extract event data from {}: {}",
949                                            stream_id.id, e
950                                        );
951                                    }
952                                }
953                            }
954                        }
955
956                        // Batch acknowledge all messages at once
957                        if !all_stream_ids.is_empty() {
958                            debug!("Acknowledging batch of {} messages", all_stream_ids.len());
959
960                            let mut cmd = redis::cmd("XACK");
961                            cmd.arg(&platform_config.stream_key)
962                                .arg(&platform_config.consumer_group);
963
964                            // Add all stream IDs to the command
965                            for stream_id in &all_stream_ids {
966                                cmd.arg(stream_id);
967                            }
968
969                            match cmd.query_async::<_, i64>(&mut conn).await {
970                                Ok(ack_count) => {
971                                    debug!("Successfully acknowledged {ack_count} messages");
972                                    if ack_count as usize != all_stream_ids.len() {
973                                        warn!(
974                                            "Acknowledged {} messages but expected {}",
975                                            ack_count,
976                                            all_stream_ids.len()
977                                        );
978                                    }
979                                }
980                                Err(e) => {
981                                    error!("Failed to acknowledge message batch: {e}");
982
983                                    // Fallback: Try acknowledging messages individually
984                                    warn!("Falling back to individual acknowledgments");
985                                    for stream_id in &all_stream_ids {
986                                        match redis::cmd("XACK")
987                                            .arg(&platform_config.stream_key)
988                                            .arg(&platform_config.consumer_group)
989                                            .arg(stream_id)
990                                            .query_async::<_, i64>(&mut conn)
991                                            .await
992                                        {
993                                            Ok(_) => {
994                                                debug!(
995                                                    "Individually acknowledged message {stream_id}"
996                                                );
997                                            }
998                                            Err(e) => {
999                                                error!(
1000                                                    "Failed to individually acknowledge message {stream_id}: {e}"
1001                                                );
1002                                            }
1003                                        }
1004                                    }
1005                                }
1006                            }
1007                        }
1008                    }
1009                    Err(e) => {
1010                        // Check if it's a connection error
1011                        if is_connection_error(&e) {
1012                            error!("Redis connection lost: {e}");
1013                            reporter.set_status(
1014                                ComponentStatus::Running,
1015                                Some(format!("Redis connection lost: {e}")),
1016                            ).await;
1017
1018                            // Try to reconnect
1019                            match Self::connect_with_retry(
1020                                &platform_config.redis_url,
1021                                platform_config.max_retries,
1022                                platform_config.retry_delay_ms,
1023                            )
1024                            .await
1025                            {
1026                                Ok(new_conn) => {
1027                                    conn = new_conn;
1028                                    info!("Reconnected to Redis");
1029                                }
1030                                Err(e) => {
1031                                    error!("Failed to reconnect to Redis: {e}");
1032                                    reporter.set_status(ComponentStatus::Stopped, None).await;
1033                                    return;
1034                                }
1035                            }
1036                        } else if !e.to_string().contains("timeout") {
1037                            // Log non-timeout errors
1038                            error!("Error reading from stream: {e}");
1039                        }
1040                    }
1041                }
1042            }
1043        }.instrument(span))
1044    }
1045}
1046
1047#[async_trait::async_trait]
1048impl Source for PlatformSource {
1049    fn id(&self) -> &str {
1050        &self.base.id
1051    }
1052
1053    fn type_name(&self) -> &str {
1054        "platform"
1055    }
1056
1057    fn properties(&self) -> HashMap<String, serde_json::Value> {
1058        use crate::descriptor::PlatformSourceConfigDto;
1059        use drasi_plugin_sdk::ConfigValue;
1060
1061        let dto = PlatformSourceConfigDto {
1062            redis_url: ConfigValue::Static(self.config.redis_url.clone()),
1063            stream_key: ConfigValue::Static(self.config.stream_key.clone()),
1064            consumer_group: ConfigValue::Static(self.config.consumer_group.clone()),
1065            consumer_name: self
1066                .config
1067                .consumer_name
1068                .as_ref()
1069                .map(|n| ConfigValue::Static(n.clone())),
1070            batch_size: ConfigValue::Static(self.config.batch_size),
1071            block_ms: ConfigValue::Static(self.config.block_ms),
1072        };
1073
1074        match serde_json::to_value(&dto) {
1075            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
1076            _ => HashMap::new(),
1077        }
1078    }
1079
1080    fn auto_start(&self) -> bool {
1081        self.base.get_auto_start()
1082    }
1083
1084    async fn start(&self) -> Result<()> {
1085        info!("Starting platform source: {}", self.base.id);
1086
1087        // Extract configuration from typed config
1088        let platform_config = PlatformConfig {
1089            redis_url: self.config.redis_url.clone(),
1090            stream_key: self.config.stream_key.clone(),
1091            consumer_group: self.config.consumer_group.clone(),
1092            consumer_name: self
1093                .config
1094                .consumer_name
1095                .clone()
1096                .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1097            batch_size: self.config.batch_size,
1098            block_ms: self.config.block_ms,
1099            start_id: ">".to_string(),
1100            always_create_consumer_group: false,
1101            max_retries: 5,
1102            retry_delay_ms: 1000,
1103        };
1104
1105        // Update status
1106        self.base
1107            .set_status(
1108                ComponentStatus::Running,
1109                Some("Platform source running".to_string()),
1110            )
1111            .await;
1112
1113        // Get instance_id from context for log routing isolation
1114        let instance_id = self
1115            .base
1116            .context()
1117            .await
1118            .map(|c| c.instance_id)
1119            .unwrap_or_default();
1120
1121        // Start consumer task
1122        let task = Self::start_consumer_task(
1123            self.base.id.clone(),
1124            instance_id,
1125            platform_config,
1126            self.base.dispatchers.clone(),
1127            self.base.status_handle(),
1128        )
1129        .await;
1130
1131        *self.base.task_handle.write().await = Some(task);
1132
1133        Ok(())
1134    }
1135
1136    async fn stop(&self) -> Result<()> {
1137        info!("Stopping platform source: {}", self.base.id);
1138
1139        // Cancel the task
1140        if let Some(handle) = self.base.task_handle.write().await.take() {
1141            handle.abort();
1142        }
1143
1144        // Update status
1145        self.base
1146            .set_status(
1147                ComponentStatus::Stopped,
1148                Some("Platform source stopped".to_string()),
1149            )
1150            .await;
1151
1152        Ok(())
1153    }
1154
1155    async fn status(&self) -> ComponentStatus {
1156        self.base.get_status().await
1157    }
1158
1159    async fn subscribe(
1160        &self,
1161        settings: drasi_lib::config::SourceSubscriptionSettings,
1162    ) -> Result<SubscriptionResponse> {
1163        self.base
1164            .subscribe_with_bootstrap(&settings, "Platform")
1165            .await
1166    }
1167
1168    fn as_any(&self) -> &dyn std::any::Any {
1169        self
1170    }
1171
1172    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1173        self.base.initialize(context).await;
1174    }
1175
1176    async fn set_bootstrap_provider(
1177        &self,
1178        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1179    ) {
1180        self.base.set_bootstrap_provider(provider).await;
1181    }
1182}
1183
1184impl PlatformSource {
1185    /// Create a test subscription to this source (synchronous)
1186    ///
1187    /// This method delegates to SourceBase and is provided for convenience in tests.
1188    /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1189    pub fn test_subscribe(
1190        &self,
1191    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1192        self.base.test_subscribe()
1193    }
1194
1195    /// Create a test subscription to this source (async)
1196    ///
1197    /// This method delegates to SourceBase and is provided for convenience in async tests.
1198    /// Prefer this method over test_subscribe() in async contexts.
1199    pub async fn test_subscribe_async(
1200        &self,
1201    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1202        self.base
1203            .create_streaming_receiver()
1204            .await
1205            .expect("Failed to create test subscription")
1206    }
1207}
1208
1209/// Extract event data from Redis stream entry
1210fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1211    // Look for common field names
1212    for key in &["data", "event", "payload", "message"] {
1213        if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1214            return String::from_utf8(data.clone())
1215                .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1216        }
1217    }
1218
1219    Err(anyhow::anyhow!(
1220        "No event data found in stream entry. Available keys: {:?}",
1221        entry_map.keys().collect::<Vec<_>>()
1222    ))
1223}
1224
1225/// Check if error is a connection error
1226fn is_connection_error(e: &redis::RedisError) -> bool {
1227    e.is_connection_dropped()
1228        || e.is_io_error()
1229        || e.to_string().contains("connection")
1230        || e.to_string().contains("EOF")
1231}
1232
1233/// Message type discriminator
1234#[derive(Debug, Clone, PartialEq)]
1235enum MessageType {
1236    /// Control message with control type from source.table
1237    Control(String),
1238    /// Data message (node/relation change)
1239    Data,
1240}
1241
1242/// Detect message type based on payload.source.db field
1243///
1244/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1245/// Returns MessageType::Data for all other cases
1246fn detect_message_type(cloud_event: &Value) -> MessageType {
1247    // Extract data array and get first event to check message type
1248    let data_array = match cloud_event["data"].as_array() {
1249        Some(arr) if !arr.is_empty() => arr,
1250        _ => return MessageType::Data, // Default to data if no data array
1251    };
1252
1253    // Check the first event's source.db field
1254    let first_event = &data_array[0];
1255    let source_db = first_event["payload"]["source"]["db"]
1256        .as_str()
1257        .unwrap_or("");
1258
1259    // Case-insensitive comparison with "Drasi"
1260    if source_db.eq_ignore_ascii_case("drasi") {
1261        // Extract source.table to determine control type
1262        let control_type = first_event["payload"]["source"]["table"]
1263            .as_str()
1264            .unwrap_or("Unknown")
1265            .to_string();
1266        MessageType::Control(control_type)
1267    } else {
1268        MessageType::Data
1269    }
1270}
1271
1272/// Helper struct to hold SourceChange along with reactivator timestamps
1273#[derive(Debug)]
1274struct SourceChangeWithTimestamps {
1275    source_change: SourceChange,
1276    reactivator_start_ns: Option<u64>,
1277    reactivator_end_ns: Option<u64>,
1278}
1279
1280/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1281///
1282/// Handles events in CloudEvent format with a data array containing change events.
1283/// Each event in the data array has: op, payload.after/before, payload.source
1284fn transform_platform_event(
1285    cloud_event: Value,
1286    source_id: &str,
1287) -> Result<Vec<SourceChangeWithTimestamps>> {
1288    let mut source_changes = Vec::new();
1289
1290    // Extract the data array from CloudEvent wrapper
1291    let data_array = cloud_event["data"]
1292        .as_array()
1293        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1294
1295    // Process each event in the data array
1296    for event in data_array {
1297        // Extract reactivator timestamps from top-level event fields
1298        let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1299        let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1300
1301        // Extract operation type (op field: "i", "u", "d")
1302        let op = event["op"]
1303            .as_str()
1304            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1305
1306        // Extract payload
1307        let payload = &event["payload"];
1308        if payload.is_null() {
1309            return Err(anyhow::anyhow!("Missing 'payload' field"));
1310        }
1311
1312        // Extract element type from payload.source.table
1313        let element_type = payload["source"]["table"]
1314            .as_str()
1315            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1316
1317        // Get element data based on operation
1318        let element_data = match op {
1319            "i" | "u" => &payload["after"],
1320            "d" => &payload["before"],
1321            _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1322        };
1323
1324        if element_data.is_null() {
1325            return Err(anyhow::anyhow!(
1326                "Missing element data (after/before) for operation {op}"
1327            ));
1328        }
1329
1330        // Extract element ID
1331        let element_id = element_data["id"]
1332            .as_str()
1333            .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1334
1335        // Extract labels
1336        let labels_array = element_data["labels"]
1337            .as_array()
1338            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1339
1340        let labels: Vec<Arc<str>> = labels_array
1341            .iter()
1342            .filter_map(|v| v.as_str().map(Arc::from))
1343            .collect();
1344
1345        if labels.is_empty() {
1346            return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1347        }
1348
1349        // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1350        let ts_ns = payload["source"]["ts_ns"]
1351            .as_u64()
1352            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1353        let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1354
1355        // Build ElementMetadata
1356        let reference = ElementReference::new(source_id, element_id);
1357        let metadata = ElementMetadata {
1358            reference,
1359            labels: labels.into(),
1360            effective_from,
1361        };
1362
1363        // Handle delete operation (no properties needed)
1364        if op == "d" {
1365            source_changes.push(SourceChangeWithTimestamps {
1366                source_change: SourceChange::Delete { metadata },
1367                reactivator_start_ns,
1368                reactivator_end_ns,
1369            });
1370            continue;
1371        }
1372
1373        // Convert properties
1374        let properties_obj = element_data["properties"]
1375            .as_object()
1376            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1377
1378        let properties = convert_json_to_element_properties(properties_obj);
1379
1380        // Build element based on type
1381        let element = match element_type {
1382            "node" => Element::Node {
1383                metadata,
1384                properties,
1385            },
1386            "rel" | "relation" => {
1387                // Extract startId and endId
1388                let start_id = element_data["startId"]
1389                    .as_str()
1390                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1391                let end_id = element_data["endId"]
1392                    .as_str()
1393                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1394
1395                Element::Relation {
1396                    metadata,
1397                    properties,
1398                    in_node: ElementReference::new(source_id, start_id),
1399                    out_node: ElementReference::new(source_id, end_id),
1400                }
1401            }
1402            _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1403        };
1404
1405        // Build SourceChange
1406        let source_change = match op {
1407            "i" => SourceChange::Insert { element },
1408            "u" => SourceChange::Update { element },
1409            _ => unreachable!(),
1410        };
1411
1412        source_changes.push(SourceChangeWithTimestamps {
1413            source_change,
1414            reactivator_start_ns,
1415            reactivator_end_ns,
1416        });
1417    }
1418
1419    Ok(source_changes)
1420}
1421
1422/// Transform CloudEvent-wrapped control event to SourceControl(s)
1423///
1424/// Handles control messages from Query API service with source.db = "Drasi"
1425/// Currently supports "SourceSubscription" control type
1426fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1427    let mut control_events = Vec::new();
1428
1429    // Extract the data array from CloudEvent wrapper
1430    let data_array = cloud_event["data"]
1431        .as_array()
1432        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1433
1434    // Check if control type is supported
1435    if control_type != "SourceSubscription" {
1436        info!(
1437            "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1438        );
1439        return Ok(control_events); // Return empty vec for unknown types
1440    }
1441
1442    // Process each event in the data array
1443    for event in data_array {
1444        // Extract operation type (op field: "i", "u", "d")
1445        let op = event["op"]
1446            .as_str()
1447            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1448
1449        // Extract payload
1450        let payload = &event["payload"];
1451        if payload.is_null() {
1452            warn!("Missing 'payload' field in control event, skipping");
1453            continue;
1454        }
1455
1456        // Get data based on operation
1457        let control_data = match op {
1458            "i" | "u" => &payload["after"],
1459            "d" => &payload["before"],
1460            _ => {
1461                warn!("Unknown operation type in control event: {op}, skipping");
1462                continue;
1463            }
1464        };
1465
1466        if control_data.is_null() {
1467            warn!("Missing control data (after/before) for operation {op}, skipping");
1468            continue;
1469        }
1470
1471        // Extract required fields for SourceSubscription
1472        let query_id = match control_data["queryId"].as_str() {
1473            Some(id) => id.to_string(),
1474            None => {
1475                warn!("Missing required 'queryId' field in control event, skipping");
1476                continue;
1477            }
1478        };
1479
1480        let query_node_id = match control_data["queryNodeId"].as_str() {
1481            Some(id) => id.to_string(),
1482            None => {
1483                warn!("Missing required 'queryNodeId' field in control event, skipping");
1484                continue;
1485            }
1486        };
1487
1488        // Extract optional label arrays (default to empty if missing)
1489        let node_labels = control_data["nodeLabels"]
1490            .as_array()
1491            .map(|arr| {
1492                arr.iter()
1493                    .filter_map(|v| v.as_str().map(String::from))
1494                    .collect()
1495            })
1496            .unwrap_or_default();
1497
1498        let rel_labels = control_data["relLabels"]
1499            .as_array()
1500            .map(|arr| {
1501                arr.iter()
1502                    .filter_map(|v| v.as_str().map(String::from))
1503                    .collect()
1504            })
1505            .unwrap_or_default();
1506
1507        // Map operation to ControlOperation
1508        let operation = match op {
1509            "i" => ControlOperation::Insert,
1510            "u" => ControlOperation::Update,
1511            "d" => ControlOperation::Delete,
1512            _ => unreachable!(), // Already filtered above
1513        };
1514
1515        // Build SourceControl::Subscription
1516        let control_event = SourceControl::Subscription {
1517            query_id,
1518            query_node_id,
1519            node_labels,
1520            rel_labels,
1521            operation,
1522        };
1523
1524        control_events.push(control_event);
1525    }
1526
1527    Ok(control_events)
1528}
1529
1530/// Dynamic plugin entry point.
1531///
1532/// Dynamic plugin entry point.
1533#[cfg(feature = "dynamic-plugin")]
1534drasi_plugin_sdk::export_plugin!(
1535    plugin_id = "platform-source",
1536    core_version = env!("CARGO_PKG_VERSION"),
1537    lib_version = env!("CARGO_PKG_VERSION"),
1538    plugin_version = env!("CARGO_PKG_VERSION"),
1539    source_descriptors = [descriptor::PlatformSourceDescriptor],
1540    reaction_descriptors = [],
1541    bootstrap_descriptors = [],
1542);