Skip to main content

drasi_source_platform/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Platform Source Plugin for Drasi
16//!
17//! This plugin consumes data change events from Redis Streams, which is the primary
18//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
19//! containing node and relation changes, as well as control events for query subscriptions.
20//!
21//! # Architecture
22//!
23//! The platform source connects to Redis as a consumer group member, enabling:
24//! - **At-least-once delivery**: Messages are acknowledged after processing
25//! - **Horizontal scaling**: Multiple consumers can share the workload
26//! - **Fault tolerance**: Unacknowledged messages are redelivered
27//!
28//! # Configuration
29//!
30//! | Field | Type | Default | Description |
31//! |-------|------|---------|-------------|
32//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
33//! | `stream_key` | string | *required* | Redis stream key to consume from |
34//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
35//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
36//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
37//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
38//!
39//! # Data Format
40//!
41//! The platform source expects CloudEvent-wrapped messages with a `data` array
42//! containing change events. Each event includes an operation type and payload.
43//!
44//! ## Node Insert
45//!
46//! ```json
47//! {
48//!     "data": [{
49//!         "op": "i",
50//!         "payload": {
51//!             "after": {
52//!                 "id": "user-123",
53//!                 "labels": ["User"],
54//!                 "properties": {
55//!                     "name": "Alice",
56//!                     "email": "alice@example.com"
57//!                 }
58//!             },
59//!             "source": {
60//!                 "db": "mydb",
61//!                 "table": "node",
62//!                 "ts_ns": 1699900000000000000
63//!             }
64//!         }
65//!     }]
66//! }
67//! ```
68//!
69//! ## Node Update
70//!
71//! ```json
72//! {
73//!     "data": [{
74//!         "op": "u",
75//!         "payload": {
76//!             "after": {
77//!                 "id": "user-123",
78//!                 "labels": ["User"],
79//!                 "properties": { "name": "Alice Updated" }
80//!             },
81//!             "source": { "table": "node", "ts_ns": 1699900001000000000 }
82//!         }
83//!     }]
84//! }
85//! ```
86//!
87//! ## Node Delete
88//!
89//! ```json
90//! {
91//!     "data": [{
92//!         "op": "d",
93//!         "payload": {
94//!             "before": {
95//!                 "id": "user-123",
96//!                 "labels": ["User"],
97//!                 "properties": {}
98//!             },
99//!             "source": { "table": "node", "ts_ns": 1699900002000000000 }
100//!         }
101//!     }]
102//! }
103//! ```
104//!
105//! ## Relation Insert
106//!
107//! ```json
108//! {
109//!     "data": [{
110//!         "op": "i",
111//!         "payload": {
112//!             "after": {
113//!                 "id": "follows-1",
114//!                 "labels": ["FOLLOWS"],
115//!                 "startId": "user-123",
116//!                 "endId": "user-456",
117//!                 "properties": { "since": "2024-01-01" }
118//!             },
119//!             "source": { "table": "rel", "ts_ns": 1699900003000000000 }
120//!         }
121//!     }]
122//! }
123//! ```
124//!
125//! # Control Events
126//!
127//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
128//! Currently supported control types:
129//!
130//! - **SourceSubscription**: Query subscription management
131//!
132//! # Example Configuration (YAML)
133//!
134//! ```yaml
135//! source_type: platform
136//! properties:
137//!   redis_url: "redis://localhost:6379"
138//!   stream_key: "my-app-changes"
139//!   consumer_group: "drasi-consumers"
140//!   batch_size: 50
141//!   block_ms: 10000
142//! ```
143//!
144//! # Usage Example
145//!
146//! ```rust,ignore
147//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
148//! use std::sync::Arc;
149//!
150//! let config = PlatformSourceBuilder::new()
151//!     .with_redis_url("redis://localhost:6379")
152//!     .with_stream_key("my-changes")
153//!     .with_consumer_group("my-consumers")
154//!     .build();
155//!
156//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
157//! drasi.add_source(source).await?;
158//! ```
159
160pub mod config;
161pub use config::PlatformSourceConfig;
162
163use anyhow::Result;
164use log::{debug, error, info, warn};
165use redis::streams::StreamReadReply;
166use serde_json::Value;
167use std::collections::HashMap;
168use std::sync::Arc;
169use std::time::Duration;
170use tokio::sync::RwLock;
171use tokio::task::JoinHandle;
172
173use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
174use drasi_lib::channels::{
175    ComponentEvent, ComponentEventSender, ComponentStatus, ComponentType, ControlOperation,
176    DispatchMode, SourceControl, SourceEvent, SourceEventWrapper, SubscriptionResponse,
177};
178use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
179use drasi_lib::sources::manager::convert_json_to_element_properties;
180use drasi_lib::Source;
181use tracing::Instrument;
182
183#[cfg(test)]
184mod tests;
185
186/// Configuration for the platform source
187#[derive(Debug, Clone)]
188struct PlatformConfig {
189    /// Redis connection URL
190    redis_url: String,
191    /// Redis stream key to read from
192    stream_key: String,
193    /// Consumer group name
194    consumer_group: String,
195    /// Consumer name (should be unique per instance)
196    consumer_name: String,
197    /// Number of events to read per XREADGROUP call
198    batch_size: usize,
199    /// Milliseconds to block waiting for events
200    block_ms: u64,
201    /// Stream position to start from (">" for new, "0" for all)
202    start_id: String,
203    /// Always recreate consumer group on startup (default: false)
204    /// If true, deletes and recreates the consumer group using start_id
205    /// If false, uses existing group position (ignores start_id if group exists)
206    always_create_consumer_group: bool,
207    /// Maximum connection retry attempts
208    max_retries: usize,
209    /// Delay between retries in milliseconds
210    retry_delay_ms: u64,
211}
212
213impl Default for PlatformConfig {
214    fn default() -> Self {
215        Self {
216            redis_url: String::new(),
217            stream_key: String::new(),
218            consumer_group: String::new(),
219            consumer_name: String::new(),
220            batch_size: 10,
221            block_ms: 5000,
222            start_id: ">".to_string(),
223            always_create_consumer_group: false,
224            max_retries: 3,
225            retry_delay_ms: 1000,
226        }
227    }
228}
229
230/// Platform source that reads events from Redis Streams.
231///
232/// This source connects to a Redis instance and consumes CloudEvent-wrapped
233/// messages from a stream using consumer groups. It supports both data events
234/// (node/relation changes) and control events (query subscriptions).
235///
236/// # Fields
237///
238/// - `base`: Common source functionality (dispatchers, status, lifecycle)
239/// - `config`: Platform-specific configuration (Redis connection, stream settings)
240pub struct PlatformSource {
241    /// Base source implementation providing common functionality
242    base: SourceBase,
243    /// Platform source configuration
244    config: PlatformSourceConfig,
245}
246
247/// Builder for creating [`PlatformSource`] instances.
248///
249/// Provides a fluent API for constructing platform sources
250/// with sensible defaults.
251///
252/// # Example
253///
254/// ```rust,ignore
255/// use drasi_source_platform::PlatformSource;
256///
257/// let source = PlatformSource::builder("my-platform-source")
258///     .with_redis_url("redis://localhost:6379")
259///     .with_stream_key("my-app-changes")
260///     .with_consumer_group("my-consumers")
261///     .with_batch_size(50)
262///     .build()?;
263/// ```
264pub struct PlatformSourceBuilder {
265    id: String,
266    redis_url: String,
267    stream_key: String,
268    consumer_group: Option<String>,
269    consumer_name: Option<String>,
270    batch_size: Option<usize>,
271    block_ms: Option<u64>,
272    dispatch_mode: Option<DispatchMode>,
273    dispatch_buffer_capacity: Option<usize>,
274    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
275    auto_start: bool,
276}
277
278impl PlatformSourceBuilder {
279    /// Create a new builder with the given ID and default values.
280    pub fn new(id: impl Into<String>) -> Self {
281        Self {
282            id: id.into(),
283            redis_url: String::new(),
284            stream_key: String::new(),
285            consumer_group: None,
286            consumer_name: None,
287            batch_size: None,
288            block_ms: None,
289            dispatch_mode: None,
290            dispatch_buffer_capacity: None,
291            bootstrap_provider: None,
292            auto_start: true,
293        }
294    }
295
296    /// Set the Redis connection URL.
297    ///
298    /// # Arguments
299    ///
300    /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
301    pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
302        self.redis_url = url.into();
303        self
304    }
305
306    /// Set the Redis stream key to consume from.
307    ///
308    /// # Arguments
309    ///
310    /// * `key` - Name of the Redis stream
311    pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
312        self.stream_key = key.into();
313        self
314    }
315
316    /// Set the consumer group name.
317    ///
318    /// # Arguments
319    ///
320    /// * `group` - Consumer group name (default: `"drasi-core"`)
321    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
322        self.consumer_group = Some(group.into());
323        self
324    }
325
326    /// Set a unique consumer name within the group.
327    ///
328    /// # Arguments
329    ///
330    /// * `name` - Unique consumer name (auto-generated if not specified)
331    pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
332        self.consumer_name = Some(name.into());
333        self
334    }
335
336    /// Set the batch size for reading events.
337    ///
338    /// # Arguments
339    ///
340    /// * `size` - Number of events to read per XREADGROUP call (default: 100)
341    pub fn with_batch_size(mut self, size: usize) -> Self {
342        self.batch_size = Some(size);
343        self
344    }
345
346    /// Set the block timeout for waiting on events.
347    ///
348    /// # Arguments
349    ///
350    /// * `ms` - Milliseconds to block waiting for events (default: 5000)
351    pub fn with_block_ms(mut self, ms: u64) -> Self {
352        self.block_ms = Some(ms);
353        self
354    }
355
356    /// Set the dispatch mode for this source
357    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
358        self.dispatch_mode = Some(mode);
359        self
360    }
361
362    /// Set the dispatch buffer capacity for this source
363    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
364        self.dispatch_buffer_capacity = Some(capacity);
365        self
366    }
367
368    /// Set the bootstrap provider for this source
369    pub fn with_bootstrap_provider(
370        mut self,
371        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
372    ) -> Self {
373        self.bootstrap_provider = Some(Box::new(provider));
374        self
375    }
376
377    /// Set whether this source should auto-start when DrasiLib starts.
378    ///
379    /// Default is `true`. Set to `false` if this source should only be
380    /// started manually via `start_source()`.
381    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
382        self.auto_start = auto_start;
383        self
384    }
385
386    /// Set the full configuration at once
387    pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
388        self.redis_url = config.redis_url;
389        self.stream_key = config.stream_key;
390        self.consumer_group = Some(config.consumer_group);
391        self.consumer_name = config.consumer_name;
392        self.batch_size = Some(config.batch_size);
393        self.block_ms = Some(config.block_ms);
394        self
395    }
396
397    /// Build the platform source.
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if the source cannot be constructed.
402    pub fn build(self) -> Result<PlatformSource> {
403        let config = PlatformSourceConfig {
404            redis_url: self.redis_url,
405            stream_key: self.stream_key,
406            consumer_group: self
407                .consumer_group
408                .unwrap_or_else(|| "drasi-core".to_string()),
409            consumer_name: self.consumer_name,
410            batch_size: self.batch_size.unwrap_or(100),
411            block_ms: self.block_ms.unwrap_or(5000),
412        };
413
414        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
415        if let Some(mode) = self.dispatch_mode {
416            params = params.with_dispatch_mode(mode);
417        }
418        if let Some(capacity) = self.dispatch_buffer_capacity {
419            params = params.with_dispatch_buffer_capacity(capacity);
420        }
421        if let Some(provider) = self.bootstrap_provider {
422            params = params.with_bootstrap_provider(provider);
423        }
424
425        Ok(PlatformSource {
426            base: SourceBase::new(params)?,
427            config,
428        })
429    }
430}
431
432impl PlatformSource {
433    /// Create a builder for PlatformSource
434    ///
435    /// # Example
436    ///
437    /// ```rust,ignore
438    /// use drasi_source_platform::PlatformSource;
439    ///
440    /// let source = PlatformSource::builder("my-platform-source")
441    ///     .with_redis_url("redis://localhost:6379")
442    ///     .with_stream_key("my-changes")
443    ///     .with_bootstrap_provider(my_provider)
444    ///     .build()?;
445    /// ```
446    pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
447        PlatformSourceBuilder::new(id)
448    }
449
450    /// Create a new platform source.
451    ///
452    /// The event channel is automatically injected when the source is added
453    /// to DrasiLib via `add_source()`.
454    ///
455    /// # Arguments
456    ///
457    /// * `id` - Unique identifier for this source instance
458    /// * `config` - Platform source configuration
459    ///
460    /// # Returns
461    ///
462    /// A new `PlatformSource` instance, or an error if construction fails.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the base source cannot be initialized.
467    ///
468    /// # Example
469    ///
470    /// ```rust,ignore
471    /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
472    ///
473    /// let config = PlatformSourceBuilder::new("my-platform-source")
474    ///     .with_redis_url("redis://localhost:6379")
475    ///     .with_stream_key("my-changes")
476    ///     .build()?;
477    /// ```
478    pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
479        let id = id.into();
480        let params = SourceBaseParams::new(id);
481        Ok(Self {
482            base: SourceBase::new(params)?,
483            config,
484        })
485    }
486
487    /// Subscribe to source events (for testing)
488    ///
489    /// This method is intended for use in tests to receive events broadcast by the source.
490    /// In production, queries subscribe to sources through the SourceManager.
491    /// Parse configuration from properties
492    #[allow(dead_code)]
493    fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
494        // Extract required fields first
495        let redis_url = properties
496            .get("redis_url")
497            .and_then(|v| v.as_str())
498            .ok_or_else(|| {
499                anyhow::anyhow!(
500                    "Configuration error: Missing required field 'redis_url'. \
501                 Platform source requires a Redis connection URL"
502                )
503            })?
504            .to_string();
505
506        let stream_key = properties
507            .get("stream_key")
508            .and_then(|v| v.as_str())
509            .ok_or_else(|| {
510                anyhow::anyhow!(
511                    "Configuration error: Missing required field 'stream_key'. \
512                 Platform source requires a Redis Stream key to read from"
513                )
514            })?
515            .to_string();
516
517        let consumer_group = properties
518            .get("consumer_group")
519            .and_then(|v| v.as_str())
520            .ok_or_else(|| {
521                anyhow::anyhow!(
522                    "Configuration error: Missing required field 'consumer_group'. \
523                 Platform source requires a consumer group name"
524                )
525            })?
526            .to_string();
527
528        let consumer_name = properties
529            .get("consumer_name")
530            .and_then(|v| v.as_str())
531            .ok_or_else(|| {
532                anyhow::anyhow!(
533                    "Configuration error: Missing required field 'consumer_name'. \
534                 Platform source requires a unique consumer name"
535                )
536            })?
537            .to_string();
538
539        // Get defaults for optional field handling
540        let defaults = PlatformConfig::default();
541
542        // Build config with optional fields
543        let config = PlatformConfig {
544            redis_url,
545            stream_key,
546            consumer_group,
547            consumer_name,
548            batch_size: properties
549                .get("batch_size")
550                .and_then(|v| v.as_u64())
551                .map(|v| v as usize)
552                .unwrap_or(defaults.batch_size),
553            block_ms: properties
554                .get("block_ms")
555                .and_then(|v| v.as_u64())
556                .unwrap_or(defaults.block_ms),
557            start_id: properties
558                .get("start_id")
559                .and_then(|v| v.as_str())
560                .map(|s| s.to_string())
561                .unwrap_or(defaults.start_id),
562            always_create_consumer_group: properties
563                .get("always_create_consumer_group")
564                .and_then(|v| v.as_bool())
565                .unwrap_or(defaults.always_create_consumer_group),
566            max_retries: properties
567                .get("max_retries")
568                .and_then(|v| v.as_u64())
569                .map(|v| v as usize)
570                .unwrap_or(defaults.max_retries),
571            retry_delay_ms: properties
572                .get("retry_delay_ms")
573                .and_then(|v| v.as_u64())
574                .unwrap_or(defaults.retry_delay_ms),
575        };
576
577        // Validate
578        if config.redis_url.is_empty() {
579            return Err(anyhow::anyhow!(
580                "Validation error: redis_url cannot be empty. \
581                 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
582            ));
583        }
584        if config.stream_key.is_empty() {
585            return Err(anyhow::anyhow!(
586                "Validation error: stream_key cannot be empty. \
587                 Please specify the Redis Stream key to read from"
588            ));
589        }
590        if config.consumer_group.is_empty() {
591            return Err(anyhow::anyhow!(
592                "Validation error: consumer_group cannot be empty. \
593                 Please specify a consumer group name for this source"
594            ));
595        }
596        if config.consumer_name.is_empty() {
597            return Err(anyhow::anyhow!(
598                "Validation error: consumer_name cannot be empty. \
599                 Please specify a unique consumer name within the consumer group"
600            ));
601        }
602
603        Ok(config)
604    }
605
606    /// Connect to Redis with retry logic
607    async fn connect_with_retry(
608        redis_url: &str,
609        max_retries: usize,
610        retry_delay_ms: u64,
611    ) -> Result<redis::aio::MultiplexedConnection> {
612        let client = redis::Client::open(redis_url)?;
613        let mut delay = retry_delay_ms;
614
615        for attempt in 0..max_retries {
616            match client.get_multiplexed_async_connection().await {
617                Ok(conn) => {
618                    info!("Successfully connected to Redis");
619                    return Ok(conn);
620                }
621                Err(e) if attempt < max_retries - 1 => {
622                    warn!(
623                        "Redis connection failed (attempt {}/{}): {}",
624                        attempt + 1,
625                        max_retries,
626                        e
627                    );
628                    tokio::time::sleep(Duration::from_millis(delay)).await;
629                    delay *= 2; // Exponential backoff
630                }
631                Err(e) => {
632                    return Err(anyhow::anyhow!(
633                        "Failed to connect to Redis after {max_retries} attempts: {e}"
634                    ));
635                }
636            }
637        }
638
639        unreachable!()
640    }
641
642    /// Create or recreate consumer group based on configuration
643    async fn create_consumer_group(
644        conn: &mut redis::aio::MultiplexedConnection,
645        stream_key: &str,
646        consumer_group: &str,
647        start_id: &str,
648        always_create: bool,
649    ) -> Result<()> {
650        // Determine the initial position for the consumer group
651        let group_start_id = if start_id == ">" {
652            "$" // ">" means only new messages, so create group at end
653        } else {
654            start_id // "0" or specific ID
655        };
656
657        // If always_create is true, delete the existing group first
658        if always_create {
659            info!(
660                "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
661            );
662
663            let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
664                .arg("DESTROY")
665                .arg(stream_key)
666                .arg(consumer_group)
667                .query_async(conn)
668                .await;
669
670            match destroy_result {
671                Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
672                Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
673                Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
674                Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
675            }
676        }
677
678        // Try to create the consumer group
679        let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
680            .arg("CREATE")
681            .arg(stream_key)
682            .arg(consumer_group)
683            .arg(group_start_id)
684            .arg("MKSTREAM")
685            .query_async(conn)
686            .await;
687
688        match result {
689            Ok(_) => {
690                info!(
691                    "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
692                );
693                Ok(())
694            }
695            Err(e) => {
696                // BUSYGROUP error means the group already exists
697                if e.to_string().contains("BUSYGROUP") {
698                    info!(
699                        "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
700                    );
701                    Ok(())
702                } else {
703                    Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
704                }
705            }
706        }
707    }
708
709    /// Start the stream consumer task
710    async fn start_consumer_task(
711        source_id: String,
712        instance_id: String,
713        platform_config: PlatformConfig,
714        dispatchers: Arc<
715            RwLock<
716                Vec<
717                    Box<
718                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
719                    >,
720                >,
721            >,
722        >,
723        event_tx: Arc<RwLock<Option<ComponentEventSender>>>,
724        status: Arc<RwLock<ComponentStatus>>,
725    ) -> JoinHandle<()> {
726        let source_id_for_span = source_id.clone();
727        let span = tracing::info_span!(
728            "platform_source_consumer",
729            instance_id = %instance_id,
730            component_id = %source_id_for_span,
731            component_type = "source"
732        );
733        tokio::spawn(async move {
734            info!(
735                "Starting platform source consumer for source '{}' on stream '{}'",
736                source_id, platform_config.stream_key
737            );
738
739            // Connect to Redis
740            let mut conn = match Self::connect_with_retry(
741                &platform_config.redis_url,
742                platform_config.max_retries,
743                platform_config.retry_delay_ms,
744            )
745            .await
746            {
747                Ok(conn) => conn,
748                Err(e) => {
749                    error!("Failed to connect to Redis: {e}");
750                    if let Some(ref tx) = *event_tx.read().await {
751                        let _ = tx
752                            .send(ComponentEvent {
753                                component_id: source_id.clone(),
754                                component_type: ComponentType::Source,
755                                status: ComponentStatus::Stopped,
756                                timestamp: chrono::Utc::now(),
757                                message: Some(format!("Failed to connect to Redis: {e}")),
758                            })
759                            .await;
760                    }
761                    *status.write().await = ComponentStatus::Stopped;
762                    return;
763                }
764            };
765
766            // Create consumer group
767            if let Err(e) = Self::create_consumer_group(
768                &mut conn,
769                &platform_config.stream_key,
770                &platform_config.consumer_group,
771                &platform_config.start_id,
772                platform_config.always_create_consumer_group,
773            )
774            .await
775            {
776                error!("Failed to create consumer group: {e}");
777                if let Some(ref tx) = *event_tx.read().await {
778                    let _ = tx
779                        .send(ComponentEvent {
780                            component_id: source_id.clone(),
781                            component_type: ComponentType::Source,
782                            status: ComponentStatus::Stopped,
783                            timestamp: chrono::Utc::now(),
784                            message: Some(format!("Failed to create consumer group: {e}")),
785                        })
786                        .await;
787                }
788                *status.write().await = ComponentStatus::Stopped;
789                return;
790            }
791
792            // Main consumer loop
793            loop {
794                // Read from stream using ">" to get next undelivered messages for this consumer group
795                let read_result: Result<StreamReadReply, redis::RedisError> =
796                    redis::cmd("XREADGROUP")
797                        .arg("GROUP")
798                        .arg(&platform_config.consumer_group)
799                        .arg(&platform_config.consumer_name)
800                        .arg("COUNT")
801                        .arg(platform_config.batch_size)
802                        .arg("BLOCK")
803                        .arg(platform_config.block_ms)
804                        .arg("STREAMS")
805                        .arg(&platform_config.stream_key)
806                        .arg(">") // Always use ">" for consumer group reads
807                        .query_async(&mut conn)
808                        .await;
809
810                match read_result {
811                    Ok(reply) => {
812                        // Collect all stream IDs for batch acknowledgment
813                        let mut all_stream_ids = Vec::new();
814
815                        // Process each stream entry
816                        for stream_key in reply.keys {
817                            for stream_id in stream_key.ids {
818                                debug!("Received event from stream: {}", stream_id.id);
819
820                                // Store stream ID for batch acknowledgment
821                                all_stream_ids.push(stream_id.id.clone());
822
823                                // Extract event data
824                                match extract_event_data(&stream_id.map) {
825                                    Ok(event_json) => {
826                                        // Parse JSON
827                                        match serde_json::from_str::<Value>(&event_json) {
828                                            Ok(cloud_event) => {
829                                                // Detect message type
830                                                let message_type =
831                                                    detect_message_type(&cloud_event);
832
833                                                match message_type {
834                                                    MessageType::Control(control_type) => {
835                                                        // Handle control message
836                                                        debug!(
837                                                            "Detected control message of type: {control_type}"
838                                                        );
839
840                                                        match transform_control_event(
841                                                            cloud_event,
842                                                            &control_type,
843                                                        ) {
844                                                            Ok(control_events) => {
845                                                                // Publish control events
846                                                                for control_event in control_events
847                                                                {
848                                                                    // Create profiling metadata with timestamps
849                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
850                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
851
852                                                                    let wrapper = SourceEventWrapper::with_profiling(
853                                                                        source_id.clone(),
854                                                                        SourceEvent::Control(control_event),
855                                                                        chrono::Utc::now(),
856                                                                        profiling,
857                                                                    );
858
859                                                                    // Dispatch via helper
860                                                                    if let Err(e) = SourceBase::dispatch_from_task(
861                                                                        dispatchers.clone(),
862                                                                        wrapper,
863                                                                        &source_id,
864                                                                    )
865                                                                    .await
866                                                                    {
867                                                                        debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
868                                                                    } else {
869                                                                        debug!(
870                                                                            "Published control event for stream {}",
871                                                                            stream_id.id
872                                                                        );
873                                                                    }
874                                                                }
875                                                            }
876                                                            Err(e) => {
877                                                                warn!(
878                                                                    "Failed to transform control event {}: {}",
879                                                                    stream_id.id, e
880                                                                );
881                                                            }
882                                                        }
883                                                    }
884                                                    MessageType::Data => {
885                                                        // Handle data message
886                                                        match transform_platform_event(
887                                                            cloud_event,
888                                                            &source_id,
889                                                        ) {
890                                                            Ok(source_changes_with_timestamps) => {
891                                                                // Publish source changes
892                                                                for item in
893                                                                    source_changes_with_timestamps
894                                                                {
895                                                                    // Create profiling metadata with timestamps
896                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
897                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
898
899                                                                    // Extract source_ns from SourceChange transaction time
900                                                                    profiling.source_ns = Some(
901                                                                        item.source_change
902                                                                            .get_transaction_time(),
903                                                                    );
904
905                                                                    // Set reactivator timestamps from event
906                                                                    profiling
907                                                                        .reactivator_start_ns =
908                                                                        item.reactivator_start_ns;
909                                                                    profiling.reactivator_end_ns =
910                                                                        item.reactivator_end_ns;
911
912                                                                    let wrapper = SourceEventWrapper::with_profiling(
913                                                                        source_id.clone(),
914                                                                        SourceEvent::Change(item.source_change),
915                                                                        chrono::Utc::now(),
916                                                                        profiling,
917                                                                    );
918
919                                                                    // Dispatch via helper
920                                                                    if let Err(e) = SourceBase::dispatch_from_task(
921                                                                        dispatchers.clone(),
922                                                                        wrapper,
923                                                                        &source_id,
924                                                                    )
925                                                                    .await
926                                                                    {
927                                                                        debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
928                                                                    } else {
929                                                                        debug!(
930                                                                            "Published source change for event {}",
931                                                                            stream_id.id
932                                                                        );
933                                                                    }
934                                                                }
935                                                            }
936                                                            Err(e) => {
937                                                                warn!(
938                                                                    "Failed to transform event {}: {}",
939                                                                    stream_id.id, e
940                                                                );
941                                                                if let Some(ref tx) =
942                                                                    *event_tx.read().await
943                                                                {
944                                                                    let _ = tx
945                                                                        .send(ComponentEvent {
946                                                                            component_id: source_id.clone(),
947                                                                            component_type:
948                                                                                ComponentType::Source,
949                                                                            status: ComponentStatus::Running,
950                                                                            timestamp: chrono::Utc::now(),
951                                                                            message: Some(format!(
952                                                                                "Transformation error: {e}"
953                                                                            )),
954                                                                        })
955                                                                        .await;
956                                                                }
957                                                            }
958                                                        }
959                                                    }
960                                                }
961                                            }
962                                            Err(e) => {
963                                                warn!(
964                                                    "Failed to parse JSON for event {}: {}",
965                                                    stream_id.id, e
966                                                );
967                                            }
968                                        }
969                                    }
970                                    Err(e) => {
971                                        warn!(
972                                            "Failed to extract event data from {}: {}",
973                                            stream_id.id, e
974                                        );
975                                    }
976                                }
977                            }
978                        }
979
980                        // Batch acknowledge all messages at once
981                        if !all_stream_ids.is_empty() {
982                            debug!("Acknowledging batch of {} messages", all_stream_ids.len());
983
984                            let mut cmd = redis::cmd("XACK");
985                            cmd.arg(&platform_config.stream_key)
986                                .arg(&platform_config.consumer_group);
987
988                            // Add all stream IDs to the command
989                            for stream_id in &all_stream_ids {
990                                cmd.arg(stream_id);
991                            }
992
993                            match cmd.query_async::<_, i64>(&mut conn).await {
994                                Ok(ack_count) => {
995                                    debug!("Successfully acknowledged {ack_count} messages");
996                                    if ack_count as usize != all_stream_ids.len() {
997                                        warn!(
998                                            "Acknowledged {} messages but expected {}",
999                                            ack_count,
1000                                            all_stream_ids.len()
1001                                        );
1002                                    }
1003                                }
1004                                Err(e) => {
1005                                    error!("Failed to acknowledge message batch: {e}");
1006
1007                                    // Fallback: Try acknowledging messages individually
1008                                    warn!("Falling back to individual acknowledgments");
1009                                    for stream_id in &all_stream_ids {
1010                                        match redis::cmd("XACK")
1011                                            .arg(&platform_config.stream_key)
1012                                            .arg(&platform_config.consumer_group)
1013                                            .arg(stream_id)
1014                                            .query_async::<_, i64>(&mut conn)
1015                                            .await
1016                                        {
1017                                            Ok(_) => {
1018                                                debug!(
1019                                                    "Individually acknowledged message {stream_id}"
1020                                                );
1021                                            }
1022                                            Err(e) => {
1023                                                error!(
1024                                                    "Failed to individually acknowledge message {stream_id}: {e}"
1025                                                );
1026                                            }
1027                                        }
1028                                    }
1029                                }
1030                            }
1031                        }
1032                    }
1033                    Err(e) => {
1034                        // Check if it's a connection error
1035                        if is_connection_error(&e) {
1036                            error!("Redis connection lost: {e}");
1037                            if let Some(ref tx) = *event_tx.read().await {
1038                                let _ = tx
1039                                    .send(ComponentEvent {
1040                                        component_id: source_id.clone(),
1041                                        component_type: ComponentType::Source,
1042                                        status: ComponentStatus::Running,
1043                                        timestamp: chrono::Utc::now(),
1044                                        message: Some(format!("Redis connection lost: {e}")),
1045                                    })
1046                                    .await;
1047                            }
1048
1049                            // Try to reconnect
1050                            match Self::connect_with_retry(
1051                                &platform_config.redis_url,
1052                                platform_config.max_retries,
1053                                platform_config.retry_delay_ms,
1054                            )
1055                            .await
1056                            {
1057                                Ok(new_conn) => {
1058                                    conn = new_conn;
1059                                    info!("Reconnected to Redis");
1060                                }
1061                                Err(e) => {
1062                                    error!("Failed to reconnect to Redis: {e}");
1063                                    *status.write().await = ComponentStatus::Stopped;
1064                                    return;
1065                                }
1066                            }
1067                        } else if !e.to_string().contains("timeout") {
1068                            // Log non-timeout errors
1069                            error!("Error reading from stream: {e}");
1070                        }
1071                    }
1072                }
1073            }
1074        }.instrument(span))
1075    }
1076}
1077
1078#[async_trait::async_trait]
1079impl Source for PlatformSource {
1080    fn id(&self) -> &str {
1081        &self.base.id
1082    }
1083
1084    fn type_name(&self) -> &str {
1085        "platform"
1086    }
1087
1088    fn properties(&self) -> HashMap<String, serde_json::Value> {
1089        let mut props = HashMap::new();
1090        props.insert(
1091            "redis_url".to_string(),
1092            serde_json::Value::String(self.config.redis_url.clone()),
1093        );
1094        props.insert(
1095            "stream_key".to_string(),
1096            serde_json::Value::String(self.config.stream_key.clone()),
1097        );
1098        props.insert(
1099            "consumer_group".to_string(),
1100            serde_json::Value::String(self.config.consumer_group.clone()),
1101        );
1102        if let Some(ref consumer_name) = self.config.consumer_name {
1103            props.insert(
1104                "consumer_name".to_string(),
1105                serde_json::Value::String(consumer_name.clone()),
1106            );
1107        }
1108        props.insert(
1109            "batch_size".to_string(),
1110            serde_json::Value::Number(self.config.batch_size.into()),
1111        );
1112        props.insert(
1113            "block_ms".to_string(),
1114            serde_json::Value::Number(self.config.block_ms.into()),
1115        );
1116        props
1117    }
1118
1119    fn auto_start(&self) -> bool {
1120        self.base.get_auto_start()
1121    }
1122
1123    async fn start(&self) -> Result<()> {
1124        info!("Starting platform source: {}", self.base.id);
1125
1126        // Extract configuration from typed config
1127        let platform_config = PlatformConfig {
1128            redis_url: self.config.redis_url.clone(),
1129            stream_key: self.config.stream_key.clone(),
1130            consumer_group: self.config.consumer_group.clone(),
1131            consumer_name: self
1132                .config
1133                .consumer_name
1134                .clone()
1135                .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1136            batch_size: self.config.batch_size,
1137            block_ms: self.config.block_ms,
1138            start_id: ">".to_string(),
1139            always_create_consumer_group: false,
1140            max_retries: 5,
1141            retry_delay_ms: 1000,
1142        };
1143
1144        // Update status
1145        *self.base.status.write().await = ComponentStatus::Running;
1146
1147        // Get instance_id from context for log routing isolation
1148        let instance_id = self
1149            .base
1150            .context()
1151            .await
1152            .map(|c| c.instance_id)
1153            .unwrap_or_default();
1154
1155        // Start consumer task
1156        let task = Self::start_consumer_task(
1157            self.base.id.clone(),
1158            instance_id,
1159            platform_config,
1160            self.base.dispatchers.clone(),
1161            self.base.status_tx(),
1162            self.base.status.clone(),
1163        )
1164        .await;
1165
1166        *self.base.task_handle.write().await = Some(task);
1167
1168        Ok(())
1169    }
1170
1171    async fn stop(&self) -> Result<()> {
1172        info!("Stopping platform source: {}", self.base.id);
1173
1174        // Cancel the task
1175        if let Some(handle) = self.base.task_handle.write().await.take() {
1176            handle.abort();
1177        }
1178
1179        // Update status
1180        *self.base.status.write().await = ComponentStatus::Stopped;
1181
1182        Ok(())
1183    }
1184
1185    async fn status(&self) -> ComponentStatus {
1186        self.base.status.read().await.clone()
1187    }
1188
1189    async fn subscribe(
1190        &self,
1191        settings: drasi_lib::config::SourceSubscriptionSettings,
1192    ) -> Result<SubscriptionResponse> {
1193        self.base
1194            .subscribe_with_bootstrap(&settings, "Platform")
1195            .await
1196    }
1197
1198    fn as_any(&self) -> &dyn std::any::Any {
1199        self
1200    }
1201
1202    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1203        self.base.initialize(context).await;
1204    }
1205
1206    async fn set_bootstrap_provider(
1207        &self,
1208        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1209    ) {
1210        self.base.set_bootstrap_provider(provider).await;
1211    }
1212}
1213
1214impl PlatformSource {
1215    /// Create a test subscription to this source (synchronous)
1216    ///
1217    /// This method delegates to SourceBase and is provided for convenience in tests.
1218    /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1219    pub fn test_subscribe(
1220        &self,
1221    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1222        self.base.test_subscribe()
1223    }
1224
1225    /// Create a test subscription to this source (async)
1226    ///
1227    /// This method delegates to SourceBase and is provided for convenience in async tests.
1228    /// Prefer this method over test_subscribe() in async contexts.
1229    pub async fn test_subscribe_async(
1230        &self,
1231    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1232        self.base
1233            .create_streaming_receiver()
1234            .await
1235            .expect("Failed to create test subscription")
1236    }
1237}
1238
1239/// Extract event data from Redis stream entry
1240fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1241    // Look for common field names
1242    for key in &["data", "event", "payload", "message"] {
1243        if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1244            return String::from_utf8(data.clone())
1245                .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1246        }
1247    }
1248
1249    Err(anyhow::anyhow!(
1250        "No event data found in stream entry. Available keys: {:?}",
1251        entry_map.keys().collect::<Vec<_>>()
1252    ))
1253}
1254
1255/// Check if error is a connection error
1256fn is_connection_error(e: &redis::RedisError) -> bool {
1257    e.is_connection_dropped()
1258        || e.is_io_error()
1259        || e.to_string().contains("connection")
1260        || e.to_string().contains("EOF")
1261}
1262
1263/// Message type discriminator
1264#[derive(Debug, Clone, PartialEq)]
1265enum MessageType {
1266    /// Control message with control type from source.table
1267    Control(String),
1268    /// Data message (node/relation change)
1269    Data,
1270}
1271
1272/// Detect message type based on payload.source.db field
1273///
1274/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1275/// Returns MessageType::Data for all other cases
1276fn detect_message_type(cloud_event: &Value) -> MessageType {
1277    // Extract data array and get first event to check message type
1278    let data_array = match cloud_event["data"].as_array() {
1279        Some(arr) if !arr.is_empty() => arr,
1280        _ => return MessageType::Data, // Default to data if no data array
1281    };
1282
1283    // Check the first event's source.db field
1284    let first_event = &data_array[0];
1285    let source_db = first_event["payload"]["source"]["db"]
1286        .as_str()
1287        .unwrap_or("");
1288
1289    // Case-insensitive comparison with "Drasi"
1290    if source_db.eq_ignore_ascii_case("drasi") {
1291        // Extract source.table to determine control type
1292        let control_type = first_event["payload"]["source"]["table"]
1293            .as_str()
1294            .unwrap_or("Unknown")
1295            .to_string();
1296        MessageType::Control(control_type)
1297    } else {
1298        MessageType::Data
1299    }
1300}
1301
1302/// Helper struct to hold SourceChange along with reactivator timestamps
1303#[derive(Debug)]
1304struct SourceChangeWithTimestamps {
1305    source_change: SourceChange,
1306    reactivator_start_ns: Option<u64>,
1307    reactivator_end_ns: Option<u64>,
1308}
1309
1310/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1311///
1312/// Handles events in CloudEvent format with a data array containing change events.
1313/// Each event in the data array has: op, payload.after/before, payload.source
1314fn transform_platform_event(
1315    cloud_event: Value,
1316    source_id: &str,
1317) -> Result<Vec<SourceChangeWithTimestamps>> {
1318    let mut source_changes = Vec::new();
1319
1320    // Extract the data array from CloudEvent wrapper
1321    let data_array = cloud_event["data"]
1322        .as_array()
1323        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1324
1325    // Process each event in the data array
1326    for event in data_array {
1327        // Extract reactivator timestamps from top-level event fields
1328        let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1329        let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1330
1331        // Extract operation type (op field: "i", "u", "d")
1332        let op = event["op"]
1333            .as_str()
1334            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1335
1336        // Extract payload
1337        let payload = &event["payload"];
1338        if payload.is_null() {
1339            return Err(anyhow::anyhow!("Missing 'payload' field"));
1340        }
1341
1342        // Extract element type from payload.source.table
1343        let element_type = payload["source"]["table"]
1344            .as_str()
1345            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1346
1347        // Get element data based on operation
1348        let element_data = match op {
1349            "i" | "u" => &payload["after"],
1350            "d" => &payload["before"],
1351            _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1352        };
1353
1354        if element_data.is_null() {
1355            return Err(anyhow::anyhow!(
1356                "Missing element data (after/before) for operation {op}"
1357            ));
1358        }
1359
1360        // Extract element ID
1361        let element_id = element_data["id"]
1362            .as_str()
1363            .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1364
1365        // Extract labels
1366        let labels_array = element_data["labels"]
1367            .as_array()
1368            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1369
1370        let labels: Vec<Arc<str>> = labels_array
1371            .iter()
1372            .filter_map(|v| v.as_str().map(Arc::from))
1373            .collect();
1374
1375        if labels.is_empty() {
1376            return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1377        }
1378
1379        // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1380        let ts_ns = payload["source"]["ts_ns"]
1381            .as_u64()
1382            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1383        let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1384
1385        // Build ElementMetadata
1386        let reference = ElementReference::new(source_id, element_id);
1387        let metadata = ElementMetadata {
1388            reference,
1389            labels: labels.into(),
1390            effective_from,
1391        };
1392
1393        // Handle delete operation (no properties needed)
1394        if op == "d" {
1395            source_changes.push(SourceChangeWithTimestamps {
1396                source_change: SourceChange::Delete { metadata },
1397                reactivator_start_ns,
1398                reactivator_end_ns,
1399            });
1400            continue;
1401        }
1402
1403        // Convert properties
1404        let properties_obj = element_data["properties"]
1405            .as_object()
1406            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1407
1408        let properties = convert_json_to_element_properties(properties_obj)?;
1409
1410        // Build element based on type
1411        let element = match element_type {
1412            "node" => Element::Node {
1413                metadata,
1414                properties,
1415            },
1416            "rel" | "relation" => {
1417                // Extract startId and endId
1418                let start_id = element_data["startId"]
1419                    .as_str()
1420                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1421                let end_id = element_data["endId"]
1422                    .as_str()
1423                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1424
1425                Element::Relation {
1426                    metadata,
1427                    properties,
1428                    out_node: ElementReference::new(source_id, start_id),
1429                    in_node: ElementReference::new(source_id, end_id),
1430                }
1431            }
1432            _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1433        };
1434
1435        // Build SourceChange
1436        let source_change = match op {
1437            "i" => SourceChange::Insert { element },
1438            "u" => SourceChange::Update { element },
1439            _ => unreachable!(),
1440        };
1441
1442        source_changes.push(SourceChangeWithTimestamps {
1443            source_change,
1444            reactivator_start_ns,
1445            reactivator_end_ns,
1446        });
1447    }
1448
1449    Ok(source_changes)
1450}
1451
1452/// Transform CloudEvent-wrapped control event to SourceControl(s)
1453///
1454/// Handles control messages from Query API service with source.db = "Drasi"
1455/// Currently supports "SourceSubscription" control type
1456fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1457    let mut control_events = Vec::new();
1458
1459    // Extract the data array from CloudEvent wrapper
1460    let data_array = cloud_event["data"]
1461        .as_array()
1462        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1463
1464    // Check if control type is supported
1465    if control_type != "SourceSubscription" {
1466        info!(
1467            "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1468        );
1469        return Ok(control_events); // Return empty vec for unknown types
1470    }
1471
1472    // Process each event in the data array
1473    for event in data_array {
1474        // Extract operation type (op field: "i", "u", "d")
1475        let op = event["op"]
1476            .as_str()
1477            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1478
1479        // Extract payload
1480        let payload = &event["payload"];
1481        if payload.is_null() {
1482            warn!("Missing 'payload' field in control event, skipping");
1483            continue;
1484        }
1485
1486        // Get data based on operation
1487        let control_data = match op {
1488            "i" | "u" => &payload["after"],
1489            "d" => &payload["before"],
1490            _ => {
1491                warn!("Unknown operation type in control event: {op}, skipping");
1492                continue;
1493            }
1494        };
1495
1496        if control_data.is_null() {
1497            warn!("Missing control data (after/before) for operation {op}, skipping");
1498            continue;
1499        }
1500
1501        // Extract required fields for SourceSubscription
1502        let query_id = match control_data["queryId"].as_str() {
1503            Some(id) => id.to_string(),
1504            None => {
1505                warn!("Missing required 'queryId' field in control event, skipping");
1506                continue;
1507            }
1508        };
1509
1510        let query_node_id = match control_data["queryNodeId"].as_str() {
1511            Some(id) => id.to_string(),
1512            None => {
1513                warn!("Missing required 'queryNodeId' field in control event, skipping");
1514                continue;
1515            }
1516        };
1517
1518        // Extract optional label arrays (default to empty if missing)
1519        let node_labels = control_data["nodeLabels"]
1520            .as_array()
1521            .map(|arr| {
1522                arr.iter()
1523                    .filter_map(|v| v.as_str().map(String::from))
1524                    .collect()
1525            })
1526            .unwrap_or_default();
1527
1528        let rel_labels = control_data["relLabels"]
1529            .as_array()
1530            .map(|arr| {
1531                arr.iter()
1532                    .filter_map(|v| v.as_str().map(String::from))
1533                    .collect()
1534            })
1535            .unwrap_or_default();
1536
1537        // Map operation to ControlOperation
1538        let operation = match op {
1539            "i" => ControlOperation::Insert,
1540            "u" => ControlOperation::Update,
1541            "d" => ControlOperation::Delete,
1542            _ => unreachable!(), // Already filtered above
1543        };
1544
1545        // Build SourceControl::Subscription
1546        let control_event = SourceControl::Subscription {
1547            query_id,
1548            query_node_id,
1549            node_labels,
1550            rel_labels,
1551            operation,
1552        };
1553
1554        control_events.push(control_event);
1555    }
1556
1557    Ok(control_events)
1558}