Skip to main content

drasi_source_platform/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Platform Source Plugin for Drasi
17//!
18//! This plugin consumes data change events from Redis Streams, which is the primary
19//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
20//! containing node and relation changes, as well as control events for query subscriptions.
21//!
22//! # Architecture
23//!
24//! The platform source connects to Redis as a consumer group member, enabling:
25//! - **At-least-once delivery**: Messages are acknowledged after processing
26//! - **Horizontal scaling**: Multiple consumers can share the workload
27//! - **Fault tolerance**: Unacknowledged messages are redelivered
28//!
29//! # Configuration
30//!
31//! | Field | Type | Default | Description |
32//! |-------|------|---------|-------------|
33//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
34//! | `stream_key` | string | *required* | Redis stream key to consume from |
35//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
36//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
37//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
38//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
39//!
40//! # Data Format
41//!
42//! The platform source expects CloudEvent-wrapped messages with a `data` array
43//! containing change events. Each event includes an operation type and payload.
44//!
45//! ## Node Insert
46//!
47//! ```json
48//! {
49//!     "data": [{
50//!         "op": "i",
51//!         "payload": {
52//!             "after": {
53//!                 "id": "user-123",
54//!                 "labels": ["User"],
55//!                 "properties": {
56//!                     "name": "Alice",
57//!                     "email": "alice@example.com"
58//!                 }
59//!             },
60//!             "source": {
61//!                 "db": "mydb",
62//!                 "table": "node",
63//!                 "ts_ns": 1699900000000000000
64//!             }
65//!         }
66//!     }]
67//! }
68//! ```
69//!
70//! ## Node Update
71//!
72//! ```json
73//! {
74//!     "data": [{
75//!         "op": "u",
76//!         "payload": {
77//!             "after": {
78//!                 "id": "user-123",
79//!                 "labels": ["User"],
80//!                 "properties": { "name": "Alice Updated" }
81//!             },
82//!             "source": { "table": "node", "ts_ns": 1699900001000000000 }
83//!         }
84//!     }]
85//! }
86//! ```
87//!
88//! ## Node Delete
89//!
90//! ```json
91//! {
92//!     "data": [{
93//!         "op": "d",
94//!         "payload": {
95//!             "before": {
96//!                 "id": "user-123",
97//!                 "labels": ["User"],
98//!                 "properties": {}
99//!             },
100//!             "source": { "table": "node", "ts_ns": 1699900002000000000 }
101//!         }
102//!     }]
103//! }
104//! ```
105//!
106//! ## Relation Insert
107//!
108//! ```json
109//! {
110//!     "data": [{
111//!         "op": "i",
112//!         "payload": {
113//!             "after": {
114//!                 "id": "follows-1",
115//!                 "labels": ["FOLLOWS"],
116//!                 "startId": "user-123",
117//!                 "endId": "user-456",
118//!                 "properties": { "since": "2024-01-01" }
119//!             },
120//!             "source": { "table": "rel", "ts_ns": 1699900003000000000 }
121//!         }
122//!     }]
123//! }
124//! ```
125//!
126//! # Control Events
127//!
128//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
129//! Currently supported control types:
130//!
131//! - **SourceSubscription**: Query subscription management
132//!
133//! # Example Configuration (YAML)
134//!
135//! ```yaml
136//! source_type: platform
137//! properties:
138//!   redis_url: "redis://localhost:6379"
139//!   stream_key: "my-app-changes"
140//!   consumer_group: "drasi-consumers"
141//!   batch_size: 50
142//!   block_ms: 10000
143//! ```
144//!
145//! # Usage Example
146//!
147//! ```rust,ignore
148//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
149//! use std::sync::Arc;
150//!
151//! let config = PlatformSourceBuilder::new()
152//!     .with_redis_url("redis://localhost:6379")
153//!     .with_stream_key("my-changes")
154//!     .with_consumer_group("my-consumers")
155//!     .build();
156//!
157//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
158//! drasi.add_source(source).await?;
159//! ```
160
161pub mod config;
162pub mod descriptor;
163pub use config::PlatformSourceConfig;
164
165use anyhow::Result;
166use log::{debug, error, info, warn};
167use redis::streams::StreamReadReply;
168use serde_json::Value;
169use std::collections::HashMap;
170use std::sync::Arc;
171use std::time::Duration;
172use tokio::sync::RwLock;
173use tokio::task::JoinHandle;
174
175use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
176use drasi_lib::channels::{
177    ComponentEvent, ComponentEventSender, ComponentStatus, ComponentType, ControlOperation,
178    DispatchMode, SourceControl, SourceEvent, SourceEventWrapper, SubscriptionResponse,
179};
180use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
181use drasi_lib::sources::manager::convert_json_to_element_properties;
182use drasi_lib::Source;
183use tracing::Instrument;
184
185#[cfg(test)]
186mod tests;
187
188/// Configuration for the platform source
189#[derive(Debug, Clone)]
190struct PlatformConfig {
191    /// Redis connection URL
192    redis_url: String,
193    /// Redis stream key to read from
194    stream_key: String,
195    /// Consumer group name
196    consumer_group: String,
197    /// Consumer name (should be unique per instance)
198    consumer_name: String,
199    /// Number of events to read per XREADGROUP call
200    batch_size: usize,
201    /// Milliseconds to block waiting for events
202    block_ms: u64,
203    /// Stream position to start from (">" for new, "0" for all)
204    start_id: String,
205    /// Always recreate consumer group on startup (default: false)
206    /// If true, deletes and recreates the consumer group using start_id
207    /// If false, uses existing group position (ignores start_id if group exists)
208    always_create_consumer_group: bool,
209    /// Maximum connection retry attempts
210    max_retries: usize,
211    /// Delay between retries in milliseconds
212    retry_delay_ms: u64,
213}
214
215impl Default for PlatformConfig {
216    fn default() -> Self {
217        Self {
218            redis_url: String::new(),
219            stream_key: String::new(),
220            consumer_group: String::new(),
221            consumer_name: String::new(),
222            batch_size: 10,
223            block_ms: 5000,
224            start_id: ">".to_string(),
225            always_create_consumer_group: false,
226            max_retries: 3,
227            retry_delay_ms: 1000,
228        }
229    }
230}
231
232/// Platform source that reads events from Redis Streams.
233///
234/// This source connects to a Redis instance and consumes CloudEvent-wrapped
235/// messages from a stream using consumer groups. It supports both data events
236/// (node/relation changes) and control events (query subscriptions).
237///
238/// # Fields
239///
240/// - `base`: Common source functionality (dispatchers, status, lifecycle)
241/// - `config`: Platform-specific configuration (Redis connection, stream settings)
242pub struct PlatformSource {
243    /// Base source implementation providing common functionality
244    base: SourceBase,
245    /// Platform source configuration
246    config: PlatformSourceConfig,
247}
248
249/// Builder for creating [`PlatformSource`] instances.
250///
251/// Provides a fluent API for constructing platform sources
252/// with sensible defaults.
253///
254/// # Example
255///
256/// ```rust,ignore
257/// use drasi_source_platform::PlatformSource;
258///
259/// let source = PlatformSource::builder("my-platform-source")
260///     .with_redis_url("redis://localhost:6379")
261///     .with_stream_key("my-app-changes")
262///     .with_consumer_group("my-consumers")
263///     .with_batch_size(50)
264///     .build()?;
265/// ```
266pub struct PlatformSourceBuilder {
267    id: String,
268    redis_url: String,
269    stream_key: String,
270    consumer_group: Option<String>,
271    consumer_name: Option<String>,
272    batch_size: Option<usize>,
273    block_ms: Option<u64>,
274    dispatch_mode: Option<DispatchMode>,
275    dispatch_buffer_capacity: Option<usize>,
276    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
277    auto_start: bool,
278}
279
280impl PlatformSourceBuilder {
281    /// Create a new builder with the given ID and default values.
282    pub fn new(id: impl Into<String>) -> Self {
283        Self {
284            id: id.into(),
285            redis_url: String::new(),
286            stream_key: String::new(),
287            consumer_group: None,
288            consumer_name: None,
289            batch_size: None,
290            block_ms: None,
291            dispatch_mode: None,
292            dispatch_buffer_capacity: None,
293            bootstrap_provider: None,
294            auto_start: true,
295        }
296    }
297
298    /// Set the Redis connection URL.
299    ///
300    /// # Arguments
301    ///
302    /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
303    pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
304        self.redis_url = url.into();
305        self
306    }
307
308    /// Set the Redis stream key to consume from.
309    ///
310    /// # Arguments
311    ///
312    /// * `key` - Name of the Redis stream
313    pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
314        self.stream_key = key.into();
315        self
316    }
317
318    /// Set the consumer group name.
319    ///
320    /// # Arguments
321    ///
322    /// * `group` - Consumer group name (default: `"drasi-core"`)
323    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
324        self.consumer_group = Some(group.into());
325        self
326    }
327
328    /// Set a unique consumer name within the group.
329    ///
330    /// # Arguments
331    ///
332    /// * `name` - Unique consumer name (auto-generated if not specified)
333    pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
334        self.consumer_name = Some(name.into());
335        self
336    }
337
338    /// Set the batch size for reading events.
339    ///
340    /// # Arguments
341    ///
342    /// * `size` - Number of events to read per XREADGROUP call (default: 100)
343    pub fn with_batch_size(mut self, size: usize) -> Self {
344        self.batch_size = Some(size);
345        self
346    }
347
348    /// Set the block timeout for waiting on events.
349    ///
350    /// # Arguments
351    ///
352    /// * `ms` - Milliseconds to block waiting for events (default: 5000)
353    pub fn with_block_ms(mut self, ms: u64) -> Self {
354        self.block_ms = Some(ms);
355        self
356    }
357
358    /// Set the dispatch mode for this source
359    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
360        self.dispatch_mode = Some(mode);
361        self
362    }
363
364    /// Set the dispatch buffer capacity for this source
365    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
366        self.dispatch_buffer_capacity = Some(capacity);
367        self
368    }
369
370    /// Set the bootstrap provider for this source
371    pub fn with_bootstrap_provider(
372        mut self,
373        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
374    ) -> Self {
375        self.bootstrap_provider = Some(Box::new(provider));
376        self
377    }
378
379    /// Set whether this source should auto-start when DrasiLib starts.
380    ///
381    /// Default is `true`. Set to `false` if this source should only be
382    /// started manually via `start_source()`.
383    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
384        self.auto_start = auto_start;
385        self
386    }
387
388    /// Set the full configuration at once
389    pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
390        self.redis_url = config.redis_url;
391        self.stream_key = config.stream_key;
392        self.consumer_group = Some(config.consumer_group);
393        self.consumer_name = config.consumer_name;
394        self.batch_size = Some(config.batch_size);
395        self.block_ms = Some(config.block_ms);
396        self
397    }
398
399    /// Build the platform source.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the source cannot be constructed.
404    pub fn build(self) -> Result<PlatformSource> {
405        let config = PlatformSourceConfig {
406            redis_url: self.redis_url,
407            stream_key: self.stream_key,
408            consumer_group: self
409                .consumer_group
410                .unwrap_or_else(|| "drasi-core".to_string()),
411            consumer_name: self.consumer_name,
412            batch_size: self.batch_size.unwrap_or(100),
413            block_ms: self.block_ms.unwrap_or(5000),
414        };
415
416        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
417        if let Some(mode) = self.dispatch_mode {
418            params = params.with_dispatch_mode(mode);
419        }
420        if let Some(capacity) = self.dispatch_buffer_capacity {
421            params = params.with_dispatch_buffer_capacity(capacity);
422        }
423        if let Some(provider) = self.bootstrap_provider {
424            params = params.with_bootstrap_provider(provider);
425        }
426
427        Ok(PlatformSource {
428            base: SourceBase::new(params)?,
429            config,
430        })
431    }
432}
433
434impl PlatformSource {
435    /// Create a builder for PlatformSource
436    ///
437    /// # Example
438    ///
439    /// ```rust,ignore
440    /// use drasi_source_platform::PlatformSource;
441    ///
442    /// let source = PlatformSource::builder("my-platform-source")
443    ///     .with_redis_url("redis://localhost:6379")
444    ///     .with_stream_key("my-changes")
445    ///     .with_bootstrap_provider(my_provider)
446    ///     .build()?;
447    /// ```
448    pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
449        PlatformSourceBuilder::new(id)
450    }
451
452    /// Create a new platform source.
453    ///
454    /// The event channel is automatically injected when the source is added
455    /// to DrasiLib via `add_source()`.
456    ///
457    /// # Arguments
458    ///
459    /// * `id` - Unique identifier for this source instance
460    /// * `config` - Platform source configuration
461    ///
462    /// # Returns
463    ///
464    /// A new `PlatformSource` instance, or an error if construction fails.
465    ///
466    /// # Errors
467    ///
468    /// Returns an error if the base source cannot be initialized.
469    ///
470    /// # Example
471    ///
472    /// ```rust,ignore
473    /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
474    ///
475    /// let config = PlatformSourceBuilder::new("my-platform-source")
476    ///     .with_redis_url("redis://localhost:6379")
477    ///     .with_stream_key("my-changes")
478    ///     .build()?;
479    /// ```
480    pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
481        let id = id.into();
482        let params = SourceBaseParams::new(id);
483        Ok(Self {
484            base: SourceBase::new(params)?,
485            config,
486        })
487    }
488
489    /// Subscribe to source events (for testing)
490    ///
491    /// This method is intended for use in tests to receive events broadcast by the source.
492    /// In production, queries subscribe to sources through the SourceManager.
493    /// Parse configuration from properties
494    #[allow(dead_code)]
495    fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
496        // Extract required fields first
497        let redis_url = properties
498            .get("redis_url")
499            .and_then(|v| v.as_str())
500            .ok_or_else(|| {
501                anyhow::anyhow!(
502                    "Configuration error: Missing required field 'redis_url'. \
503                 Platform source requires a Redis connection URL"
504                )
505            })?
506            .to_string();
507
508        let stream_key = properties
509            .get("stream_key")
510            .and_then(|v| v.as_str())
511            .ok_or_else(|| {
512                anyhow::anyhow!(
513                    "Configuration error: Missing required field 'stream_key'. \
514                 Platform source requires a Redis Stream key to read from"
515                )
516            })?
517            .to_string();
518
519        let consumer_group = properties
520            .get("consumer_group")
521            .and_then(|v| v.as_str())
522            .ok_or_else(|| {
523                anyhow::anyhow!(
524                    "Configuration error: Missing required field 'consumer_group'. \
525                 Platform source requires a consumer group name"
526                )
527            })?
528            .to_string();
529
530        let consumer_name = properties
531            .get("consumer_name")
532            .and_then(|v| v.as_str())
533            .ok_or_else(|| {
534                anyhow::anyhow!(
535                    "Configuration error: Missing required field 'consumer_name'. \
536                 Platform source requires a unique consumer name"
537                )
538            })?
539            .to_string();
540
541        // Get defaults for optional field handling
542        let defaults = PlatformConfig::default();
543
544        // Build config with optional fields
545        let config = PlatformConfig {
546            redis_url,
547            stream_key,
548            consumer_group,
549            consumer_name,
550            batch_size: properties
551                .get("batch_size")
552                .and_then(|v| v.as_u64())
553                .map(|v| v as usize)
554                .unwrap_or(defaults.batch_size),
555            block_ms: properties
556                .get("block_ms")
557                .and_then(|v| v.as_u64())
558                .unwrap_or(defaults.block_ms),
559            start_id: properties
560                .get("start_id")
561                .and_then(|v| v.as_str())
562                .map(|s| s.to_string())
563                .unwrap_or(defaults.start_id),
564            always_create_consumer_group: properties
565                .get("always_create_consumer_group")
566                .and_then(|v| v.as_bool())
567                .unwrap_or(defaults.always_create_consumer_group),
568            max_retries: properties
569                .get("max_retries")
570                .and_then(|v| v.as_u64())
571                .map(|v| v as usize)
572                .unwrap_or(defaults.max_retries),
573            retry_delay_ms: properties
574                .get("retry_delay_ms")
575                .and_then(|v| v.as_u64())
576                .unwrap_or(defaults.retry_delay_ms),
577        };
578
579        // Validate
580        if config.redis_url.is_empty() {
581            return Err(anyhow::anyhow!(
582                "Validation error: redis_url cannot be empty. \
583                 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
584            ));
585        }
586        if config.stream_key.is_empty() {
587            return Err(anyhow::anyhow!(
588                "Validation error: stream_key cannot be empty. \
589                 Please specify the Redis Stream key to read from"
590            ));
591        }
592        if config.consumer_group.is_empty() {
593            return Err(anyhow::anyhow!(
594                "Validation error: consumer_group cannot be empty. \
595                 Please specify a consumer group name for this source"
596            ));
597        }
598        if config.consumer_name.is_empty() {
599            return Err(anyhow::anyhow!(
600                "Validation error: consumer_name cannot be empty. \
601                 Please specify a unique consumer name within the consumer group"
602            ));
603        }
604
605        Ok(config)
606    }
607
608    /// Connect to Redis with retry logic
609    async fn connect_with_retry(
610        redis_url: &str,
611        max_retries: usize,
612        retry_delay_ms: u64,
613    ) -> Result<redis::aio::MultiplexedConnection> {
614        let client = redis::Client::open(redis_url)?;
615        let mut delay = retry_delay_ms;
616
617        for attempt in 0..max_retries {
618            match client.get_multiplexed_async_connection().await {
619                Ok(conn) => {
620                    info!("Successfully connected to Redis");
621                    return Ok(conn);
622                }
623                Err(e) if attempt < max_retries - 1 => {
624                    warn!(
625                        "Redis connection failed (attempt {}/{}): {}",
626                        attempt + 1,
627                        max_retries,
628                        e
629                    );
630                    tokio::time::sleep(Duration::from_millis(delay)).await;
631                    delay *= 2; // Exponential backoff
632                }
633                Err(e) => {
634                    return Err(anyhow::anyhow!(
635                        "Failed to connect to Redis after {max_retries} attempts: {e}"
636                    ));
637                }
638            }
639        }
640
641        unreachable!()
642    }
643
644    /// Create or recreate consumer group based on configuration
645    async fn create_consumer_group(
646        conn: &mut redis::aio::MultiplexedConnection,
647        stream_key: &str,
648        consumer_group: &str,
649        start_id: &str,
650        always_create: bool,
651    ) -> Result<()> {
652        // Determine the initial position for the consumer group
653        let group_start_id = if start_id == ">" {
654            "$" // ">" means only new messages, so create group at end
655        } else {
656            start_id // "0" or specific ID
657        };
658
659        // If always_create is true, delete the existing group first
660        if always_create {
661            info!(
662                "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
663            );
664
665            let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
666                .arg("DESTROY")
667                .arg(stream_key)
668                .arg(consumer_group)
669                .query_async(conn)
670                .await;
671
672            match destroy_result {
673                Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
674                Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
675                Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
676                Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
677            }
678        }
679
680        // Try to create the consumer group
681        let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
682            .arg("CREATE")
683            .arg(stream_key)
684            .arg(consumer_group)
685            .arg(group_start_id)
686            .arg("MKSTREAM")
687            .query_async(conn)
688            .await;
689
690        match result {
691            Ok(_) => {
692                info!(
693                    "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
694                );
695                Ok(())
696            }
697            Err(e) => {
698                // BUSYGROUP error means the group already exists
699                if e.to_string().contains("BUSYGROUP") {
700                    info!(
701                        "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
702                    );
703                    Ok(())
704                } else {
705                    Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
706                }
707            }
708        }
709    }
710
711    /// Start the stream consumer task
712    async fn start_consumer_task(
713        source_id: String,
714        instance_id: String,
715        platform_config: PlatformConfig,
716        dispatchers: Arc<
717            RwLock<
718                Vec<
719                    Box<
720                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
721                    >,
722                >,
723            >,
724        >,
725        event_tx: Arc<RwLock<Option<ComponentEventSender>>>,
726        status: Arc<RwLock<ComponentStatus>>,
727    ) -> JoinHandle<()> {
728        let source_id_for_span = source_id.clone();
729        let span = tracing::info_span!(
730            "platform_source_consumer",
731            instance_id = %instance_id,
732            component_id = %source_id_for_span,
733            component_type = "source"
734        );
735        tokio::spawn(async move {
736            info!(
737                "Starting platform source consumer for source '{}' on stream '{}'",
738                source_id, platform_config.stream_key
739            );
740
741            // Connect to Redis
742            let mut conn = match Self::connect_with_retry(
743                &platform_config.redis_url,
744                platform_config.max_retries,
745                platform_config.retry_delay_ms,
746            )
747            .await
748            {
749                Ok(conn) => conn,
750                Err(e) => {
751                    error!("Failed to connect to Redis: {e}");
752                    if let Some(ref tx) = *event_tx.read().await {
753                        let _ = tx
754                            .send(ComponentEvent {
755                                component_id: source_id.clone(),
756                                component_type: ComponentType::Source,
757                                status: ComponentStatus::Stopped,
758                                timestamp: chrono::Utc::now(),
759                                message: Some(format!("Failed to connect to Redis: {e}")),
760                            })
761                            .await;
762                    }
763                    *status.write().await = ComponentStatus::Stopped;
764                    return;
765                }
766            };
767
768            // Create consumer group
769            if let Err(e) = Self::create_consumer_group(
770                &mut conn,
771                &platform_config.stream_key,
772                &platform_config.consumer_group,
773                &platform_config.start_id,
774                platform_config.always_create_consumer_group,
775            )
776            .await
777            {
778                error!("Failed to create consumer group: {e}");
779                if let Some(ref tx) = *event_tx.read().await {
780                    let _ = tx
781                        .send(ComponentEvent {
782                            component_id: source_id.clone(),
783                            component_type: ComponentType::Source,
784                            status: ComponentStatus::Stopped,
785                            timestamp: chrono::Utc::now(),
786                            message: Some(format!("Failed to create consumer group: {e}")),
787                        })
788                        .await;
789                }
790                *status.write().await = ComponentStatus::Stopped;
791                return;
792            }
793
794            // Main consumer loop
795            loop {
796                // Read from stream using ">" to get next undelivered messages for this consumer group
797                let read_result: Result<StreamReadReply, redis::RedisError> =
798                    redis::cmd("XREADGROUP")
799                        .arg("GROUP")
800                        .arg(&platform_config.consumer_group)
801                        .arg(&platform_config.consumer_name)
802                        .arg("COUNT")
803                        .arg(platform_config.batch_size)
804                        .arg("BLOCK")
805                        .arg(platform_config.block_ms)
806                        .arg("STREAMS")
807                        .arg(&platform_config.stream_key)
808                        .arg(">") // Always use ">" for consumer group reads
809                        .query_async(&mut conn)
810                        .await;
811
812                match read_result {
813                    Ok(reply) => {
814                        // Collect all stream IDs for batch acknowledgment
815                        let mut all_stream_ids = Vec::new();
816
817                        // Process each stream entry
818                        for stream_key in reply.keys {
819                            for stream_id in stream_key.ids {
820                                debug!("Received event from stream: {}", stream_id.id);
821
822                                // Store stream ID for batch acknowledgment
823                                all_stream_ids.push(stream_id.id.clone());
824
825                                // Extract event data
826                                match extract_event_data(&stream_id.map) {
827                                    Ok(event_json) => {
828                                        // Parse JSON
829                                        match serde_json::from_str::<Value>(&event_json) {
830                                            Ok(cloud_event) => {
831                                                // Detect message type
832                                                let message_type =
833                                                    detect_message_type(&cloud_event);
834
835                                                match message_type {
836                                                    MessageType::Control(control_type) => {
837                                                        // Handle control message
838                                                        debug!(
839                                                            "Detected control message of type: {control_type}"
840                                                        );
841
842                                                        match transform_control_event(
843                                                            cloud_event,
844                                                            &control_type,
845                                                        ) {
846                                                            Ok(control_events) => {
847                                                                // Publish control events
848                                                                for control_event in control_events
849                                                                {
850                                                                    // Create profiling metadata with timestamps
851                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
852                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
853
854                                                                    let wrapper = SourceEventWrapper::with_profiling(
855                                                                        source_id.clone(),
856                                                                        SourceEvent::Control(control_event),
857                                                                        chrono::Utc::now(),
858                                                                        profiling,
859                                                                    );
860
861                                                                    // Dispatch via helper
862                                                                    if let Err(e) = SourceBase::dispatch_from_task(
863                                                                        dispatchers.clone(),
864                                                                        wrapper,
865                                                                        &source_id,
866                                                                    )
867                                                                    .await
868                                                                    {
869                                                                        debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
870                                                                    } else {
871                                                                        debug!(
872                                                                            "Published control event for stream {}",
873                                                                            stream_id.id
874                                                                        );
875                                                                    }
876                                                                }
877                                                            }
878                                                            Err(e) => {
879                                                                warn!(
880                                                                    "Failed to transform control event {}: {}",
881                                                                    stream_id.id, e
882                                                                );
883                                                            }
884                                                        }
885                                                    }
886                                                    MessageType::Data => {
887                                                        // Handle data message
888                                                        match transform_platform_event(
889                                                            cloud_event,
890                                                            &source_id,
891                                                        ) {
892                                                            Ok(source_changes_with_timestamps) => {
893                                                                // Publish source changes
894                                                                for item in
895                                                                    source_changes_with_timestamps
896                                                                {
897                                                                    // Create profiling metadata with timestamps
898                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
899                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
900
901                                                                    // Extract source_ns from SourceChange transaction time
902                                                                    profiling.source_ns = Some(
903                                                                        item.source_change
904                                                                            .get_transaction_time(),
905                                                                    );
906
907                                                                    // Set reactivator timestamps from event
908                                                                    profiling
909                                                                        .reactivator_start_ns =
910                                                                        item.reactivator_start_ns;
911                                                                    profiling.reactivator_end_ns =
912                                                                        item.reactivator_end_ns;
913
914                                                                    let wrapper = SourceEventWrapper::with_profiling(
915                                                                        source_id.clone(),
916                                                                        SourceEvent::Change(item.source_change),
917                                                                        chrono::Utc::now(),
918                                                                        profiling,
919                                                                    );
920
921                                                                    // Dispatch via helper
922                                                                    if let Err(e) = SourceBase::dispatch_from_task(
923                                                                        dispatchers.clone(),
924                                                                        wrapper,
925                                                                        &source_id,
926                                                                    )
927                                                                    .await
928                                                                    {
929                                                                        debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
930                                                                    } else {
931                                                                        debug!(
932                                                                            "Published source change for event {}",
933                                                                            stream_id.id
934                                                                        );
935                                                                    }
936                                                                }
937                                                            }
938                                                            Err(e) => {
939                                                                warn!(
940                                                                    "Failed to transform event {}: {}",
941                                                                    stream_id.id, e
942                                                                );
943                                                                if let Some(ref tx) =
944                                                                    *event_tx.read().await
945                                                                {
946                                                                    let _ = tx
947                                                                        .send(ComponentEvent {
948                                                                            component_id: source_id.clone(),
949                                                                            component_type:
950                                                                                ComponentType::Source,
951                                                                            status: ComponentStatus::Running,
952                                                                            timestamp: chrono::Utc::now(),
953                                                                            message: Some(format!(
954                                                                                "Transformation error: {e}"
955                                                                            )),
956                                                                        })
957                                                                        .await;
958                                                                }
959                                                            }
960                                                        }
961                                                    }
962                                                }
963                                            }
964                                            Err(e) => {
965                                                warn!(
966                                                    "Failed to parse JSON for event {}: {}",
967                                                    stream_id.id, e
968                                                );
969                                            }
970                                        }
971                                    }
972                                    Err(e) => {
973                                        warn!(
974                                            "Failed to extract event data from {}: {}",
975                                            stream_id.id, e
976                                        );
977                                    }
978                                }
979                            }
980                        }
981
982                        // Batch acknowledge all messages at once
983                        if !all_stream_ids.is_empty() {
984                            debug!("Acknowledging batch of {} messages", all_stream_ids.len());
985
986                            let mut cmd = redis::cmd("XACK");
987                            cmd.arg(&platform_config.stream_key)
988                                .arg(&platform_config.consumer_group);
989
990                            // Add all stream IDs to the command
991                            for stream_id in &all_stream_ids {
992                                cmd.arg(stream_id);
993                            }
994
995                            match cmd.query_async::<_, i64>(&mut conn).await {
996                                Ok(ack_count) => {
997                                    debug!("Successfully acknowledged {ack_count} messages");
998                                    if ack_count as usize != all_stream_ids.len() {
999                                        warn!(
1000                                            "Acknowledged {} messages but expected {}",
1001                                            ack_count,
1002                                            all_stream_ids.len()
1003                                        );
1004                                    }
1005                                }
1006                                Err(e) => {
1007                                    error!("Failed to acknowledge message batch: {e}");
1008
1009                                    // Fallback: Try acknowledging messages individually
1010                                    warn!("Falling back to individual acknowledgments");
1011                                    for stream_id in &all_stream_ids {
1012                                        match redis::cmd("XACK")
1013                                            .arg(&platform_config.stream_key)
1014                                            .arg(&platform_config.consumer_group)
1015                                            .arg(stream_id)
1016                                            .query_async::<_, i64>(&mut conn)
1017                                            .await
1018                                        {
1019                                            Ok(_) => {
1020                                                debug!(
1021                                                    "Individually acknowledged message {stream_id}"
1022                                                );
1023                                            }
1024                                            Err(e) => {
1025                                                error!(
1026                                                    "Failed to individually acknowledge message {stream_id}: {e}"
1027                                                );
1028                                            }
1029                                        }
1030                                    }
1031                                }
1032                            }
1033                        }
1034                    }
1035                    Err(e) => {
1036                        // Check if it's a connection error
1037                        if is_connection_error(&e) {
1038                            error!("Redis connection lost: {e}");
1039                            if let Some(ref tx) = *event_tx.read().await {
1040                                let _ = tx
1041                                    .send(ComponentEvent {
1042                                        component_id: source_id.clone(),
1043                                        component_type: ComponentType::Source,
1044                                        status: ComponentStatus::Running,
1045                                        timestamp: chrono::Utc::now(),
1046                                        message: Some(format!("Redis connection lost: {e}")),
1047                                    })
1048                                    .await;
1049                            }
1050
1051                            // Try to reconnect
1052                            match Self::connect_with_retry(
1053                                &platform_config.redis_url,
1054                                platform_config.max_retries,
1055                                platform_config.retry_delay_ms,
1056                            )
1057                            .await
1058                            {
1059                                Ok(new_conn) => {
1060                                    conn = new_conn;
1061                                    info!("Reconnected to Redis");
1062                                }
1063                                Err(e) => {
1064                                    error!("Failed to reconnect to Redis: {e}");
1065                                    *status.write().await = ComponentStatus::Stopped;
1066                                    return;
1067                                }
1068                            }
1069                        } else if !e.to_string().contains("timeout") {
1070                            // Log non-timeout errors
1071                            error!("Error reading from stream: {e}");
1072                        }
1073                    }
1074                }
1075            }
1076        }.instrument(span))
1077    }
1078}
1079
1080#[async_trait::async_trait]
1081impl Source for PlatformSource {
1082    fn id(&self) -> &str {
1083        &self.base.id
1084    }
1085
1086    fn type_name(&self) -> &str {
1087        "platform"
1088    }
1089
1090    fn properties(&self) -> HashMap<String, serde_json::Value> {
1091        let mut props = HashMap::new();
1092        props.insert(
1093            "redis_url".to_string(),
1094            serde_json::Value::String(self.config.redis_url.clone()),
1095        );
1096        props.insert(
1097            "stream_key".to_string(),
1098            serde_json::Value::String(self.config.stream_key.clone()),
1099        );
1100        props.insert(
1101            "consumer_group".to_string(),
1102            serde_json::Value::String(self.config.consumer_group.clone()),
1103        );
1104        if let Some(ref consumer_name) = self.config.consumer_name {
1105            props.insert(
1106                "consumer_name".to_string(),
1107                serde_json::Value::String(consumer_name.clone()),
1108            );
1109        }
1110        props.insert(
1111            "batch_size".to_string(),
1112            serde_json::Value::Number(self.config.batch_size.into()),
1113        );
1114        props.insert(
1115            "block_ms".to_string(),
1116            serde_json::Value::Number(self.config.block_ms.into()),
1117        );
1118        props
1119    }
1120
1121    fn auto_start(&self) -> bool {
1122        self.base.get_auto_start()
1123    }
1124
1125    async fn start(&self) -> Result<()> {
1126        info!("Starting platform source: {}", self.base.id);
1127
1128        // Extract configuration from typed config
1129        let platform_config = PlatformConfig {
1130            redis_url: self.config.redis_url.clone(),
1131            stream_key: self.config.stream_key.clone(),
1132            consumer_group: self.config.consumer_group.clone(),
1133            consumer_name: self
1134                .config
1135                .consumer_name
1136                .clone()
1137                .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1138            batch_size: self.config.batch_size,
1139            block_ms: self.config.block_ms,
1140            start_id: ">".to_string(),
1141            always_create_consumer_group: false,
1142            max_retries: 5,
1143            retry_delay_ms: 1000,
1144        };
1145
1146        // Update status
1147        *self.base.status.write().await = ComponentStatus::Running;
1148
1149        // Get instance_id from context for log routing isolation
1150        let instance_id = self
1151            .base
1152            .context()
1153            .await
1154            .map(|c| c.instance_id)
1155            .unwrap_or_default();
1156
1157        // Start consumer task
1158        let task = Self::start_consumer_task(
1159            self.base.id.clone(),
1160            instance_id,
1161            platform_config,
1162            self.base.dispatchers.clone(),
1163            self.base.status_tx(),
1164            self.base.status.clone(),
1165        )
1166        .await;
1167
1168        *self.base.task_handle.write().await = Some(task);
1169
1170        Ok(())
1171    }
1172
1173    async fn stop(&self) -> Result<()> {
1174        info!("Stopping platform source: {}", self.base.id);
1175
1176        // Cancel the task
1177        if let Some(handle) = self.base.task_handle.write().await.take() {
1178            handle.abort();
1179        }
1180
1181        // Update status
1182        *self.base.status.write().await = ComponentStatus::Stopped;
1183
1184        Ok(())
1185    }
1186
1187    async fn status(&self) -> ComponentStatus {
1188        self.base.status.read().await.clone()
1189    }
1190
1191    async fn subscribe(
1192        &self,
1193        settings: drasi_lib::config::SourceSubscriptionSettings,
1194    ) -> Result<SubscriptionResponse> {
1195        self.base
1196            .subscribe_with_bootstrap(&settings, "Platform")
1197            .await
1198    }
1199
1200    fn as_any(&self) -> &dyn std::any::Any {
1201        self
1202    }
1203
1204    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1205        self.base.initialize(context).await;
1206    }
1207
1208    async fn set_bootstrap_provider(
1209        &self,
1210        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1211    ) {
1212        self.base.set_bootstrap_provider(provider).await;
1213    }
1214}
1215
1216impl PlatformSource {
1217    /// Create a test subscription to this source (synchronous)
1218    ///
1219    /// This method delegates to SourceBase and is provided for convenience in tests.
1220    /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1221    pub fn test_subscribe(
1222        &self,
1223    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1224        self.base.test_subscribe()
1225    }
1226
1227    /// Create a test subscription to this source (async)
1228    ///
1229    /// This method delegates to SourceBase and is provided for convenience in async tests.
1230    /// Prefer this method over test_subscribe() in async contexts.
1231    pub async fn test_subscribe_async(
1232        &self,
1233    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1234        self.base
1235            .create_streaming_receiver()
1236            .await
1237            .expect("Failed to create test subscription")
1238    }
1239}
1240
1241/// Extract event data from Redis stream entry
1242fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1243    // Look for common field names
1244    for key in &["data", "event", "payload", "message"] {
1245        if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1246            return String::from_utf8(data.clone())
1247                .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1248        }
1249    }
1250
1251    Err(anyhow::anyhow!(
1252        "No event data found in stream entry. Available keys: {:?}",
1253        entry_map.keys().collect::<Vec<_>>()
1254    ))
1255}
1256
1257/// Check if error is a connection error
1258fn is_connection_error(e: &redis::RedisError) -> bool {
1259    e.is_connection_dropped()
1260        || e.is_io_error()
1261        || e.to_string().contains("connection")
1262        || e.to_string().contains("EOF")
1263}
1264
1265/// Message type discriminator
1266#[derive(Debug, Clone, PartialEq)]
1267enum MessageType {
1268    /// Control message with control type from source.table
1269    Control(String),
1270    /// Data message (node/relation change)
1271    Data,
1272}
1273
1274/// Detect message type based on payload.source.db field
1275///
1276/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1277/// Returns MessageType::Data for all other cases
1278fn detect_message_type(cloud_event: &Value) -> MessageType {
1279    // Extract data array and get first event to check message type
1280    let data_array = match cloud_event["data"].as_array() {
1281        Some(arr) if !arr.is_empty() => arr,
1282        _ => return MessageType::Data, // Default to data if no data array
1283    };
1284
1285    // Check the first event's source.db field
1286    let first_event = &data_array[0];
1287    let source_db = first_event["payload"]["source"]["db"]
1288        .as_str()
1289        .unwrap_or("");
1290
1291    // Case-insensitive comparison with "Drasi"
1292    if source_db.eq_ignore_ascii_case("drasi") {
1293        // Extract source.table to determine control type
1294        let control_type = first_event["payload"]["source"]["table"]
1295            .as_str()
1296            .unwrap_or("Unknown")
1297            .to_string();
1298        MessageType::Control(control_type)
1299    } else {
1300        MessageType::Data
1301    }
1302}
1303
1304/// Helper struct to hold SourceChange along with reactivator timestamps
1305#[derive(Debug)]
1306struct SourceChangeWithTimestamps {
1307    source_change: SourceChange,
1308    reactivator_start_ns: Option<u64>,
1309    reactivator_end_ns: Option<u64>,
1310}
1311
1312/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1313///
1314/// Handles events in CloudEvent format with a data array containing change events.
1315/// Each event in the data array has: op, payload.after/before, payload.source
1316fn transform_platform_event(
1317    cloud_event: Value,
1318    source_id: &str,
1319) -> Result<Vec<SourceChangeWithTimestamps>> {
1320    let mut source_changes = Vec::new();
1321
1322    // Extract the data array from CloudEvent wrapper
1323    let data_array = cloud_event["data"]
1324        .as_array()
1325        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1326
1327    // Process each event in the data array
1328    for event in data_array {
1329        // Extract reactivator timestamps from top-level event fields
1330        let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1331        let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1332
1333        // Extract operation type (op field: "i", "u", "d")
1334        let op = event["op"]
1335            .as_str()
1336            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1337
1338        // Extract payload
1339        let payload = &event["payload"];
1340        if payload.is_null() {
1341            return Err(anyhow::anyhow!("Missing 'payload' field"));
1342        }
1343
1344        // Extract element type from payload.source.table
1345        let element_type = payload["source"]["table"]
1346            .as_str()
1347            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1348
1349        // Get element data based on operation
1350        let element_data = match op {
1351            "i" | "u" => &payload["after"],
1352            "d" => &payload["before"],
1353            _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1354        };
1355
1356        if element_data.is_null() {
1357            return Err(anyhow::anyhow!(
1358                "Missing element data (after/before) for operation {op}"
1359            ));
1360        }
1361
1362        // Extract element ID
1363        let element_id = element_data["id"]
1364            .as_str()
1365            .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1366
1367        // Extract labels
1368        let labels_array = element_data["labels"]
1369            .as_array()
1370            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1371
1372        let labels: Vec<Arc<str>> = labels_array
1373            .iter()
1374            .filter_map(|v| v.as_str().map(Arc::from))
1375            .collect();
1376
1377        if labels.is_empty() {
1378            return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1379        }
1380
1381        // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1382        let ts_ns = payload["source"]["ts_ns"]
1383            .as_u64()
1384            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1385        let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1386
1387        // Build ElementMetadata
1388        let reference = ElementReference::new(source_id, element_id);
1389        let metadata = ElementMetadata {
1390            reference,
1391            labels: labels.into(),
1392            effective_from,
1393        };
1394
1395        // Handle delete operation (no properties needed)
1396        if op == "d" {
1397            source_changes.push(SourceChangeWithTimestamps {
1398                source_change: SourceChange::Delete { metadata },
1399                reactivator_start_ns,
1400                reactivator_end_ns,
1401            });
1402            continue;
1403        }
1404
1405        // Convert properties
1406        let properties_obj = element_data["properties"]
1407            .as_object()
1408            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1409
1410        let properties = convert_json_to_element_properties(properties_obj)?;
1411
1412        // Build element based on type
1413        let element = match element_type {
1414            "node" => Element::Node {
1415                metadata,
1416                properties,
1417            },
1418            "rel" | "relation" => {
1419                // Extract startId and endId
1420                let start_id = element_data["startId"]
1421                    .as_str()
1422                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1423                let end_id = element_data["endId"]
1424                    .as_str()
1425                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1426
1427                Element::Relation {
1428                    metadata,
1429                    properties,
1430                    in_node: ElementReference::new(source_id, start_id),
1431                    out_node: ElementReference::new(source_id, end_id),
1432                }
1433            }
1434            _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1435        };
1436
1437        // Build SourceChange
1438        let source_change = match op {
1439            "i" => SourceChange::Insert { element },
1440            "u" => SourceChange::Update { element },
1441            _ => unreachable!(),
1442        };
1443
1444        source_changes.push(SourceChangeWithTimestamps {
1445            source_change,
1446            reactivator_start_ns,
1447            reactivator_end_ns,
1448        });
1449    }
1450
1451    Ok(source_changes)
1452}
1453
1454/// Transform CloudEvent-wrapped control event to SourceControl(s)
1455///
1456/// Handles control messages from Query API service with source.db = "Drasi"
1457/// Currently supports "SourceSubscription" control type
1458fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1459    let mut control_events = Vec::new();
1460
1461    // Extract the data array from CloudEvent wrapper
1462    let data_array = cloud_event["data"]
1463        .as_array()
1464        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1465
1466    // Check if control type is supported
1467    if control_type != "SourceSubscription" {
1468        info!(
1469            "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1470        );
1471        return Ok(control_events); // Return empty vec for unknown types
1472    }
1473
1474    // Process each event in the data array
1475    for event in data_array {
1476        // Extract operation type (op field: "i", "u", "d")
1477        let op = event["op"]
1478            .as_str()
1479            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1480
1481        // Extract payload
1482        let payload = &event["payload"];
1483        if payload.is_null() {
1484            warn!("Missing 'payload' field in control event, skipping");
1485            continue;
1486        }
1487
1488        // Get data based on operation
1489        let control_data = match op {
1490            "i" | "u" => &payload["after"],
1491            "d" => &payload["before"],
1492            _ => {
1493                warn!("Unknown operation type in control event: {op}, skipping");
1494                continue;
1495            }
1496        };
1497
1498        if control_data.is_null() {
1499            warn!("Missing control data (after/before) for operation {op}, skipping");
1500            continue;
1501        }
1502
1503        // Extract required fields for SourceSubscription
1504        let query_id = match control_data["queryId"].as_str() {
1505            Some(id) => id.to_string(),
1506            None => {
1507                warn!("Missing required 'queryId' field in control event, skipping");
1508                continue;
1509            }
1510        };
1511
1512        let query_node_id = match control_data["queryNodeId"].as_str() {
1513            Some(id) => id.to_string(),
1514            None => {
1515                warn!("Missing required 'queryNodeId' field in control event, skipping");
1516                continue;
1517            }
1518        };
1519
1520        // Extract optional label arrays (default to empty if missing)
1521        let node_labels = control_data["nodeLabels"]
1522            .as_array()
1523            .map(|arr| {
1524                arr.iter()
1525                    .filter_map(|v| v.as_str().map(String::from))
1526                    .collect()
1527            })
1528            .unwrap_or_default();
1529
1530        let rel_labels = control_data["relLabels"]
1531            .as_array()
1532            .map(|arr| {
1533                arr.iter()
1534                    .filter_map(|v| v.as_str().map(String::from))
1535                    .collect()
1536            })
1537            .unwrap_or_default();
1538
1539        // Map operation to ControlOperation
1540        let operation = match op {
1541            "i" => ControlOperation::Insert,
1542            "u" => ControlOperation::Update,
1543            "d" => ControlOperation::Delete,
1544            _ => unreachable!(), // Already filtered above
1545        };
1546
1547        // Build SourceControl::Subscription
1548        let control_event = SourceControl::Subscription {
1549            query_id,
1550            query_node_id,
1551            node_labels,
1552            rel_labels,
1553            operation,
1554        };
1555
1556        control_events.push(control_event);
1557    }
1558
1559    Ok(control_events)
1560}
1561
1562/// Dynamic plugin entry point.
1563///
1564/// Dynamic plugin entry point.
1565#[cfg(feature = "dynamic-plugin")]
1566drasi_plugin_sdk::export_plugin!(
1567    plugin_id = "platform-source",
1568    core_version = env!("CARGO_PKG_VERSION"),
1569    lib_version = env!("CARGO_PKG_VERSION"),
1570    plugin_version = env!("CARGO_PKG_VERSION"),
1571    source_descriptors = [descriptor::PlatformSourceDescriptor],
1572    reaction_descriptors = [],
1573    bootstrap_descriptors = [],
1574);