Skip to main content

drasi_source_platform/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Platform Source Plugin for Drasi
16//!
17//! This plugin consumes data change events from Redis Streams, which is the primary
18//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
19//! containing node and relation changes, as well as control events for query subscriptions.
20//!
21//! # Architecture
22//!
23//! The platform source connects to Redis as a consumer group member, enabling:
24//! - **At-least-once delivery**: Messages are acknowledged after processing
25//! - **Horizontal scaling**: Multiple consumers can share the workload
26//! - **Fault tolerance**: Unacknowledged messages are redelivered
27//!
28//! # Configuration
29//!
30//! | Field | Type | Default | Description |
31//! |-------|------|---------|-------------|
32//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
33//! | `stream_key` | string | *required* | Redis stream key to consume from |
34//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
35//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
36//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
37//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
38//!
39//! # Data Format
40//!
41//! The platform source expects CloudEvent-wrapped messages with a `data` array
42//! containing change events. Each event includes an operation type and payload.
43//!
44//! ## Node Insert
45//!
46//! ```json
47//! {
48//!     "data": [{
49//!         "op": "i",
50//!         "payload": {
51//!             "after": {
52//!                 "id": "user-123",
53//!                 "labels": ["User"],
54//!                 "properties": {
55//!                     "name": "Alice",
56//!                     "email": "alice@example.com"
57//!                 }
58//!             },
59//!             "source": {
60//!                 "db": "mydb",
61//!                 "table": "node",
62//!                 "ts_ns": 1699900000000000000
63//!             }
64//!         }
65//!     }]
66//! }
67//! ```
68//!
69//! ## Node Update
70//!
71//! ```json
72//! {
73//!     "data": [{
74//!         "op": "u",
75//!         "payload": {
76//!             "after": {
77//!                 "id": "user-123",
78//!                 "labels": ["User"],
79//!                 "properties": { "name": "Alice Updated" }
80//!             },
81//!             "source": { "table": "node", "ts_ns": 1699900001000000000 }
82//!         }
83//!     }]
84//! }
85//! ```
86//!
87//! ## Node Delete
88//!
89//! ```json
90//! {
91//!     "data": [{
92//!         "op": "d",
93//!         "payload": {
94//!             "before": {
95//!                 "id": "user-123",
96//!                 "labels": ["User"],
97//!                 "properties": {}
98//!             },
99//!             "source": { "table": "node", "ts_ns": 1699900002000000000 }
100//!         }
101//!     }]
102//! }
103//! ```
104//!
105//! ## Relation Insert
106//!
107//! ```json
108//! {
109//!     "data": [{
110//!         "op": "i",
111//!         "payload": {
112//!             "after": {
113//!                 "id": "follows-1",
114//!                 "labels": ["FOLLOWS"],
115//!                 "startId": "user-123",
116//!                 "endId": "user-456",
117//!                 "properties": { "since": "2024-01-01" }
118//!             },
119//!             "source": { "table": "rel", "ts_ns": 1699900003000000000 }
120//!         }
121//!     }]
122//! }
123//! ```
124//!
125//! # Control Events
126//!
127//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
128//! Currently supported control types:
129//!
130//! - **SourceSubscription**: Query subscription management
131//!
132//! # Example Configuration (YAML)
133//!
134//! ```yaml
135//! source_type: platform
136//! properties:
137//!   redis_url: "redis://localhost:6379"
138//!   stream_key: "my-app-changes"
139//!   consumer_group: "drasi-consumers"
140//!   batch_size: 50
141//!   block_ms: 10000
142//! ```
143//!
144//! # Usage Example
145//!
146//! ```rust,ignore
147//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
148//! use std::sync::Arc;
149//!
150//! let config = PlatformSourceBuilder::new()
151//!     .with_redis_url("redis://localhost:6379")
152//!     .with_stream_key("my-changes")
153//!     .with_consumer_group("my-consumers")
154//!     .build();
155//!
156//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
157//! drasi.add_source(source).await?;
158//! ```
159
160pub mod config;
161pub use config::PlatformSourceConfig;
162
163use anyhow::Result;
164use log::{debug, error, info, warn};
165use redis::streams::StreamReadReply;
166use serde_json::Value;
167use std::collections::HashMap;
168use std::sync::Arc;
169use std::time::Duration;
170use tokio::sync::RwLock;
171use tokio::task::JoinHandle;
172
173use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
174use drasi_lib::channels::{
175    ComponentEvent, ComponentEventSender, ComponentStatus, ComponentType, ControlOperation,
176    DispatchMode, SourceControl, SourceEvent, SourceEventWrapper, SubscriptionResponse,
177};
178use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
179use drasi_lib::sources::manager::convert_json_to_element_properties;
180use drasi_lib::Source;
181
182#[cfg(test)]
183mod tests;
184
185/// Configuration for the platform source
186#[derive(Debug, Clone)]
187struct PlatformConfig {
188    /// Redis connection URL
189    redis_url: String,
190    /// Redis stream key to read from
191    stream_key: String,
192    /// Consumer group name
193    consumer_group: String,
194    /// Consumer name (should be unique per instance)
195    consumer_name: String,
196    /// Number of events to read per XREADGROUP call
197    batch_size: usize,
198    /// Milliseconds to block waiting for events
199    block_ms: u64,
200    /// Stream position to start from (">" for new, "0" for all)
201    start_id: String,
202    /// Always recreate consumer group on startup (default: false)
203    /// If true, deletes and recreates the consumer group using start_id
204    /// If false, uses existing group position (ignores start_id if group exists)
205    always_create_consumer_group: bool,
206    /// Maximum connection retry attempts
207    max_retries: usize,
208    /// Delay between retries in milliseconds
209    retry_delay_ms: u64,
210}
211
212impl Default for PlatformConfig {
213    fn default() -> Self {
214        Self {
215            redis_url: String::new(),
216            stream_key: String::new(),
217            consumer_group: String::new(),
218            consumer_name: String::new(),
219            batch_size: 10,
220            block_ms: 5000,
221            start_id: ">".to_string(),
222            always_create_consumer_group: false,
223            max_retries: 3,
224            retry_delay_ms: 1000,
225        }
226    }
227}
228
229/// Platform source that reads events from Redis Streams.
230///
231/// This source connects to a Redis instance and consumes CloudEvent-wrapped
232/// messages from a stream using consumer groups. It supports both data events
233/// (node/relation changes) and control events (query subscriptions).
234///
235/// # Fields
236///
237/// - `base`: Common source functionality (dispatchers, status, lifecycle)
238/// - `config`: Platform-specific configuration (Redis connection, stream settings)
239pub struct PlatformSource {
240    /// Base source implementation providing common functionality
241    base: SourceBase,
242    /// Platform source configuration
243    config: PlatformSourceConfig,
244}
245
246/// Builder for creating [`PlatformSource`] instances.
247///
248/// Provides a fluent API for constructing platform sources
249/// with sensible defaults.
250///
251/// # Example
252///
253/// ```rust,ignore
254/// use drasi_source_platform::PlatformSource;
255///
256/// let source = PlatformSource::builder("my-platform-source")
257///     .with_redis_url("redis://localhost:6379")
258///     .with_stream_key("my-app-changes")
259///     .with_consumer_group("my-consumers")
260///     .with_batch_size(50)
261///     .build()?;
262/// ```
263pub struct PlatformSourceBuilder {
264    id: String,
265    redis_url: String,
266    stream_key: String,
267    consumer_group: Option<String>,
268    consumer_name: Option<String>,
269    batch_size: Option<usize>,
270    block_ms: Option<u64>,
271    dispatch_mode: Option<DispatchMode>,
272    dispatch_buffer_capacity: Option<usize>,
273    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
274    auto_start: bool,
275}
276
277impl PlatformSourceBuilder {
278    /// Create a new builder with the given ID and default values.
279    pub fn new(id: impl Into<String>) -> Self {
280        Self {
281            id: id.into(),
282            redis_url: String::new(),
283            stream_key: String::new(),
284            consumer_group: None,
285            consumer_name: None,
286            batch_size: None,
287            block_ms: None,
288            dispatch_mode: None,
289            dispatch_buffer_capacity: None,
290            bootstrap_provider: None,
291            auto_start: true,
292        }
293    }
294
295    /// Set the Redis connection URL.
296    ///
297    /// # Arguments
298    ///
299    /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
300    pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
301        self.redis_url = url.into();
302        self
303    }
304
305    /// Set the Redis stream key to consume from.
306    ///
307    /// # Arguments
308    ///
309    /// * `key` - Name of the Redis stream
310    pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
311        self.stream_key = key.into();
312        self
313    }
314
315    /// Set the consumer group name.
316    ///
317    /// # Arguments
318    ///
319    /// * `group` - Consumer group name (default: `"drasi-core"`)
320    pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
321        self.consumer_group = Some(group.into());
322        self
323    }
324
325    /// Set a unique consumer name within the group.
326    ///
327    /// # Arguments
328    ///
329    /// * `name` - Unique consumer name (auto-generated if not specified)
330    pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
331        self.consumer_name = Some(name.into());
332        self
333    }
334
335    /// Set the batch size for reading events.
336    ///
337    /// # Arguments
338    ///
339    /// * `size` - Number of events to read per XREADGROUP call (default: 100)
340    pub fn with_batch_size(mut self, size: usize) -> Self {
341        self.batch_size = Some(size);
342        self
343    }
344
345    /// Set the block timeout for waiting on events.
346    ///
347    /// # Arguments
348    ///
349    /// * `ms` - Milliseconds to block waiting for events (default: 5000)
350    pub fn with_block_ms(mut self, ms: u64) -> Self {
351        self.block_ms = Some(ms);
352        self
353    }
354
355    /// Set the dispatch mode for this source
356    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
357        self.dispatch_mode = Some(mode);
358        self
359    }
360
361    /// Set the dispatch buffer capacity for this source
362    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
363        self.dispatch_buffer_capacity = Some(capacity);
364        self
365    }
366
367    /// Set the bootstrap provider for this source
368    pub fn with_bootstrap_provider(
369        mut self,
370        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
371    ) -> Self {
372        self.bootstrap_provider = Some(Box::new(provider));
373        self
374    }
375
376    /// Set whether this source should auto-start when DrasiLib starts.
377    ///
378    /// Default is `true`. Set to `false` if this source should only be
379    /// started manually via `start_source()`.
380    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
381        self.auto_start = auto_start;
382        self
383    }
384
385    /// Set the full configuration at once
386    pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
387        self.redis_url = config.redis_url;
388        self.stream_key = config.stream_key;
389        self.consumer_group = Some(config.consumer_group);
390        self.consumer_name = config.consumer_name;
391        self.batch_size = Some(config.batch_size);
392        self.block_ms = Some(config.block_ms);
393        self
394    }
395
396    /// Build the platform source.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the source cannot be constructed.
401    pub fn build(self) -> Result<PlatformSource> {
402        let config = PlatformSourceConfig {
403            redis_url: self.redis_url,
404            stream_key: self.stream_key,
405            consumer_group: self
406                .consumer_group
407                .unwrap_or_else(|| "drasi-core".to_string()),
408            consumer_name: self.consumer_name,
409            batch_size: self.batch_size.unwrap_or(100),
410            block_ms: self.block_ms.unwrap_or(5000),
411        };
412
413        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
414        if let Some(mode) = self.dispatch_mode {
415            params = params.with_dispatch_mode(mode);
416        }
417        if let Some(capacity) = self.dispatch_buffer_capacity {
418            params = params.with_dispatch_buffer_capacity(capacity);
419        }
420        if let Some(provider) = self.bootstrap_provider {
421            params = params.with_bootstrap_provider(provider);
422        }
423
424        Ok(PlatformSource {
425            base: SourceBase::new(params)?,
426            config,
427        })
428    }
429}
430
431impl PlatformSource {
432    /// Create a builder for PlatformSource
433    ///
434    /// # Example
435    ///
436    /// ```rust,ignore
437    /// use drasi_source_platform::PlatformSource;
438    ///
439    /// let source = PlatformSource::builder("my-platform-source")
440    ///     .with_redis_url("redis://localhost:6379")
441    ///     .with_stream_key("my-changes")
442    ///     .with_bootstrap_provider(my_provider)
443    ///     .build()?;
444    /// ```
445    pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
446        PlatformSourceBuilder::new(id)
447    }
448
449    /// Create a new platform source.
450    ///
451    /// The event channel is automatically injected when the source is added
452    /// to DrasiLib via `add_source()`.
453    ///
454    /// # Arguments
455    ///
456    /// * `id` - Unique identifier for this source instance
457    /// * `config` - Platform source configuration
458    ///
459    /// # Returns
460    ///
461    /// A new `PlatformSource` instance, or an error if construction fails.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the base source cannot be initialized.
466    ///
467    /// # Example
468    ///
469    /// ```rust,ignore
470    /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
471    ///
472    /// let config = PlatformSourceBuilder::new("my-platform-source")
473    ///     .with_redis_url("redis://localhost:6379")
474    ///     .with_stream_key("my-changes")
475    ///     .build()?;
476    /// ```
477    pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
478        let id = id.into();
479        let params = SourceBaseParams::new(id);
480        Ok(Self {
481            base: SourceBase::new(params)?,
482            config,
483        })
484    }
485
486    /// Subscribe to source events (for testing)
487    ///
488    /// This method is intended for use in tests to receive events broadcast by the source.
489    /// In production, queries subscribe to sources through the SourceManager.
490    /// Parse configuration from properties
491    #[allow(dead_code)]
492    fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
493        // Extract required fields first
494        let redis_url = properties
495            .get("redis_url")
496            .and_then(|v| v.as_str())
497            .ok_or_else(|| {
498                anyhow::anyhow!(
499                    "Configuration error: Missing required field 'redis_url'. \
500                 Platform source requires a Redis connection URL"
501                )
502            })?
503            .to_string();
504
505        let stream_key = properties
506            .get("stream_key")
507            .and_then(|v| v.as_str())
508            .ok_or_else(|| {
509                anyhow::anyhow!(
510                    "Configuration error: Missing required field 'stream_key'. \
511                 Platform source requires a Redis Stream key to read from"
512                )
513            })?
514            .to_string();
515
516        let consumer_group = properties
517            .get("consumer_group")
518            .and_then(|v| v.as_str())
519            .ok_or_else(|| {
520                anyhow::anyhow!(
521                    "Configuration error: Missing required field 'consumer_group'. \
522                 Platform source requires a consumer group name"
523                )
524            })?
525            .to_string();
526
527        let consumer_name = properties
528            .get("consumer_name")
529            .and_then(|v| v.as_str())
530            .ok_or_else(|| {
531                anyhow::anyhow!(
532                    "Configuration error: Missing required field 'consumer_name'. \
533                 Platform source requires a unique consumer name"
534                )
535            })?
536            .to_string();
537
538        // Get defaults for optional field handling
539        let defaults = PlatformConfig::default();
540
541        // Build config with optional fields
542        let config = PlatformConfig {
543            redis_url,
544            stream_key,
545            consumer_group,
546            consumer_name,
547            batch_size: properties
548                .get("batch_size")
549                .and_then(|v| v.as_u64())
550                .map(|v| v as usize)
551                .unwrap_or(defaults.batch_size),
552            block_ms: properties
553                .get("block_ms")
554                .and_then(|v| v.as_u64())
555                .unwrap_or(defaults.block_ms),
556            start_id: properties
557                .get("start_id")
558                .and_then(|v| v.as_str())
559                .map(|s| s.to_string())
560                .unwrap_or(defaults.start_id),
561            always_create_consumer_group: properties
562                .get("always_create_consumer_group")
563                .and_then(|v| v.as_bool())
564                .unwrap_or(defaults.always_create_consumer_group),
565            max_retries: properties
566                .get("max_retries")
567                .and_then(|v| v.as_u64())
568                .map(|v| v as usize)
569                .unwrap_or(defaults.max_retries),
570            retry_delay_ms: properties
571                .get("retry_delay_ms")
572                .and_then(|v| v.as_u64())
573                .unwrap_or(defaults.retry_delay_ms),
574        };
575
576        // Validate
577        if config.redis_url.is_empty() {
578            return Err(anyhow::anyhow!(
579                "Validation error: redis_url cannot be empty. \
580                 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
581            ));
582        }
583        if config.stream_key.is_empty() {
584            return Err(anyhow::anyhow!(
585                "Validation error: stream_key cannot be empty. \
586                 Please specify the Redis Stream key to read from"
587            ));
588        }
589        if config.consumer_group.is_empty() {
590            return Err(anyhow::anyhow!(
591                "Validation error: consumer_group cannot be empty. \
592                 Please specify a consumer group name for this source"
593            ));
594        }
595        if config.consumer_name.is_empty() {
596            return Err(anyhow::anyhow!(
597                "Validation error: consumer_name cannot be empty. \
598                 Please specify a unique consumer name within the consumer group"
599            ));
600        }
601
602        Ok(config)
603    }
604
605    /// Connect to Redis with retry logic
606    async fn connect_with_retry(
607        redis_url: &str,
608        max_retries: usize,
609        retry_delay_ms: u64,
610    ) -> Result<redis::aio::MultiplexedConnection> {
611        let client = redis::Client::open(redis_url)?;
612        let mut delay = retry_delay_ms;
613
614        for attempt in 0..max_retries {
615            match client.get_multiplexed_async_connection().await {
616                Ok(conn) => {
617                    info!("Successfully connected to Redis");
618                    return Ok(conn);
619                }
620                Err(e) if attempt < max_retries - 1 => {
621                    warn!(
622                        "Redis connection failed (attempt {}/{}): {}",
623                        attempt + 1,
624                        max_retries,
625                        e
626                    );
627                    tokio::time::sleep(Duration::from_millis(delay)).await;
628                    delay *= 2; // Exponential backoff
629                }
630                Err(e) => {
631                    return Err(anyhow::anyhow!(
632                        "Failed to connect to Redis after {max_retries} attempts: {e}"
633                    ));
634                }
635            }
636        }
637
638        unreachable!()
639    }
640
641    /// Create or recreate consumer group based on configuration
642    async fn create_consumer_group(
643        conn: &mut redis::aio::MultiplexedConnection,
644        stream_key: &str,
645        consumer_group: &str,
646        start_id: &str,
647        always_create: bool,
648    ) -> Result<()> {
649        // Determine the initial position for the consumer group
650        let group_start_id = if start_id == ">" {
651            "$" // ">" means only new messages, so create group at end
652        } else {
653            start_id // "0" or specific ID
654        };
655
656        // If always_create is true, delete the existing group first
657        if always_create {
658            info!(
659                "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
660            );
661
662            let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
663                .arg("DESTROY")
664                .arg(stream_key)
665                .arg(consumer_group)
666                .query_async(conn)
667                .await;
668
669            match destroy_result {
670                Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
671                Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
672                Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
673                Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
674            }
675        }
676
677        // Try to create the consumer group
678        let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
679            .arg("CREATE")
680            .arg(stream_key)
681            .arg(consumer_group)
682            .arg(group_start_id)
683            .arg("MKSTREAM")
684            .query_async(conn)
685            .await;
686
687        match result {
688            Ok(_) => {
689                info!(
690                    "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
691                );
692                Ok(())
693            }
694            Err(e) => {
695                // BUSYGROUP error means the group already exists
696                if e.to_string().contains("BUSYGROUP") {
697                    info!(
698                        "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
699                    );
700                    Ok(())
701                } else {
702                    Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
703                }
704            }
705        }
706    }
707
708    /// Start the stream consumer task
709    async fn start_consumer_task(
710        source_id: String,
711        platform_config: PlatformConfig,
712        dispatchers: Arc<
713            RwLock<
714                Vec<
715                    Box<
716                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
717                    >,
718                >,
719            >,
720        >,
721        event_tx: Arc<RwLock<Option<ComponentEventSender>>>,
722        status: Arc<RwLock<ComponentStatus>>,
723    ) -> JoinHandle<()> {
724        tokio::spawn(async move {
725            info!(
726                "Starting platform source consumer for source '{}' on stream '{}'",
727                source_id, platform_config.stream_key
728            );
729
730            // Connect to Redis
731            let mut conn = match Self::connect_with_retry(
732                &platform_config.redis_url,
733                platform_config.max_retries,
734                platform_config.retry_delay_ms,
735            )
736            .await
737            {
738                Ok(conn) => conn,
739                Err(e) => {
740                    error!("Failed to connect to Redis: {e}");
741                    if let Some(ref tx) = *event_tx.read().await {
742                        let _ = tx
743                            .send(ComponentEvent {
744                                component_id: source_id.clone(),
745                                component_type: ComponentType::Source,
746                                status: ComponentStatus::Stopped,
747                                timestamp: chrono::Utc::now(),
748                                message: Some(format!("Failed to connect to Redis: {e}")),
749                            })
750                            .await;
751                    }
752                    *status.write().await = ComponentStatus::Stopped;
753                    return;
754                }
755            };
756
757            // Create consumer group
758            if let Err(e) = Self::create_consumer_group(
759                &mut conn,
760                &platform_config.stream_key,
761                &platform_config.consumer_group,
762                &platform_config.start_id,
763                platform_config.always_create_consumer_group,
764            )
765            .await
766            {
767                error!("Failed to create consumer group: {e}");
768                if let Some(ref tx) = *event_tx.read().await {
769                    let _ = tx
770                        .send(ComponentEvent {
771                            component_id: source_id.clone(),
772                            component_type: ComponentType::Source,
773                            status: ComponentStatus::Stopped,
774                            timestamp: chrono::Utc::now(),
775                            message: Some(format!("Failed to create consumer group: {e}")),
776                        })
777                        .await;
778                }
779                *status.write().await = ComponentStatus::Stopped;
780                return;
781            }
782
783            // Main consumer loop
784            loop {
785                // Read from stream using ">" to get next undelivered messages for this consumer group
786                let read_result: Result<StreamReadReply, redis::RedisError> =
787                    redis::cmd("XREADGROUP")
788                        .arg("GROUP")
789                        .arg(&platform_config.consumer_group)
790                        .arg(&platform_config.consumer_name)
791                        .arg("COUNT")
792                        .arg(platform_config.batch_size)
793                        .arg("BLOCK")
794                        .arg(platform_config.block_ms)
795                        .arg("STREAMS")
796                        .arg(&platform_config.stream_key)
797                        .arg(">") // Always use ">" for consumer group reads
798                        .query_async(&mut conn)
799                        .await;
800
801                match read_result {
802                    Ok(reply) => {
803                        // Collect all stream IDs for batch acknowledgment
804                        let mut all_stream_ids = Vec::new();
805
806                        // Process each stream entry
807                        for stream_key in reply.keys {
808                            for stream_id in stream_key.ids {
809                                debug!("Received event from stream: {}", stream_id.id);
810
811                                // Store stream ID for batch acknowledgment
812                                all_stream_ids.push(stream_id.id.clone());
813
814                                // Extract event data
815                                match extract_event_data(&stream_id.map) {
816                                    Ok(event_json) => {
817                                        // Parse JSON
818                                        match serde_json::from_str::<Value>(&event_json) {
819                                            Ok(cloud_event) => {
820                                                // Detect message type
821                                                let message_type =
822                                                    detect_message_type(&cloud_event);
823
824                                                match message_type {
825                                                    MessageType::Control(control_type) => {
826                                                        // Handle control message
827                                                        debug!(
828                                                            "Detected control message of type: {control_type}"
829                                                        );
830
831                                                        match transform_control_event(
832                                                            cloud_event,
833                                                            &control_type,
834                                                        ) {
835                                                            Ok(control_events) => {
836                                                                // Publish control events
837                                                                for control_event in control_events
838                                                                {
839                                                                    // Create profiling metadata with timestamps
840                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
841                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
842
843                                                                    let wrapper = SourceEventWrapper::with_profiling(
844                                                                        source_id.clone(),
845                                                                        SourceEvent::Control(control_event),
846                                                                        chrono::Utc::now(),
847                                                                        profiling,
848                                                                    );
849
850                                                                    // Dispatch via helper
851                                                                    if let Err(e) = SourceBase::dispatch_from_task(
852                                                                        dispatchers.clone(),
853                                                                        wrapper,
854                                                                        &source_id,
855                                                                    )
856                                                                    .await
857                                                                    {
858                                                                        debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
859                                                                    } else {
860                                                                        debug!(
861                                                                            "Published control event for stream {}",
862                                                                            stream_id.id
863                                                                        );
864                                                                    }
865                                                                }
866                                                            }
867                                                            Err(e) => {
868                                                                warn!(
869                                                                    "Failed to transform control event {}: {}",
870                                                                    stream_id.id, e
871                                                                );
872                                                            }
873                                                        }
874                                                    }
875                                                    MessageType::Data => {
876                                                        // Handle data message
877                                                        match transform_platform_event(
878                                                            cloud_event,
879                                                            &source_id,
880                                                        ) {
881                                                            Ok(source_changes_with_timestamps) => {
882                                                                // Publish source changes
883                                                                for item in
884                                                                    source_changes_with_timestamps
885                                                                {
886                                                                    // Create profiling metadata with timestamps
887                                                                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
888                                                                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
889
890                                                                    // Extract source_ns from SourceChange transaction time
891                                                                    profiling.source_ns = Some(
892                                                                        item.source_change
893                                                                            .get_transaction_time(),
894                                                                    );
895
896                                                                    // Set reactivator timestamps from event
897                                                                    profiling
898                                                                        .reactivator_start_ns =
899                                                                        item.reactivator_start_ns;
900                                                                    profiling.reactivator_end_ns =
901                                                                        item.reactivator_end_ns;
902
903                                                                    let wrapper = SourceEventWrapper::with_profiling(
904                                                                        source_id.clone(),
905                                                                        SourceEvent::Change(item.source_change),
906                                                                        chrono::Utc::now(),
907                                                                        profiling,
908                                                                    );
909
910                                                                    // Dispatch via helper
911                                                                    if let Err(e) = SourceBase::dispatch_from_task(
912                                                                        dispatchers.clone(),
913                                                                        wrapper,
914                                                                        &source_id,
915                                                                    )
916                                                                    .await
917                                                                    {
918                                                                        debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
919                                                                    } else {
920                                                                        debug!(
921                                                                            "Published source change for event {}",
922                                                                            stream_id.id
923                                                                        );
924                                                                    }
925                                                                }
926                                                            }
927                                                            Err(e) => {
928                                                                warn!(
929                                                                    "Failed to transform event {}: {}",
930                                                                    stream_id.id, e
931                                                                );
932                                                                if let Some(ref tx) =
933                                                                    *event_tx.read().await
934                                                                {
935                                                                    let _ = tx
936                                                                        .send(ComponentEvent {
937                                                                            component_id: source_id.clone(),
938                                                                            component_type:
939                                                                                ComponentType::Source,
940                                                                            status: ComponentStatus::Running,
941                                                                            timestamp: chrono::Utc::now(),
942                                                                            message: Some(format!(
943                                                                                "Transformation error: {e}"
944                                                                            )),
945                                                                        })
946                                                                        .await;
947                                                                }
948                                                            }
949                                                        }
950                                                    }
951                                                }
952                                            }
953                                            Err(e) => {
954                                                warn!(
955                                                    "Failed to parse JSON for event {}: {}",
956                                                    stream_id.id, e
957                                                );
958                                            }
959                                        }
960                                    }
961                                    Err(e) => {
962                                        warn!(
963                                            "Failed to extract event data from {}: {}",
964                                            stream_id.id, e
965                                        );
966                                    }
967                                }
968                            }
969                        }
970
971                        // Batch acknowledge all messages at once
972                        if !all_stream_ids.is_empty() {
973                            debug!("Acknowledging batch of {} messages", all_stream_ids.len());
974
975                            let mut cmd = redis::cmd("XACK");
976                            cmd.arg(&platform_config.stream_key)
977                                .arg(&platform_config.consumer_group);
978
979                            // Add all stream IDs to the command
980                            for stream_id in &all_stream_ids {
981                                cmd.arg(stream_id);
982                            }
983
984                            match cmd.query_async::<_, i64>(&mut conn).await {
985                                Ok(ack_count) => {
986                                    debug!("Successfully acknowledged {ack_count} messages");
987                                    if ack_count as usize != all_stream_ids.len() {
988                                        warn!(
989                                            "Acknowledged {} messages but expected {}",
990                                            ack_count,
991                                            all_stream_ids.len()
992                                        );
993                                    }
994                                }
995                                Err(e) => {
996                                    error!("Failed to acknowledge message batch: {e}");
997
998                                    // Fallback: Try acknowledging messages individually
999                                    warn!("Falling back to individual acknowledgments");
1000                                    for stream_id in &all_stream_ids {
1001                                        match redis::cmd("XACK")
1002                                            .arg(&platform_config.stream_key)
1003                                            .arg(&platform_config.consumer_group)
1004                                            .arg(stream_id)
1005                                            .query_async::<_, i64>(&mut conn)
1006                                            .await
1007                                        {
1008                                            Ok(_) => {
1009                                                debug!(
1010                                                    "Individually acknowledged message {stream_id}"
1011                                                );
1012                                            }
1013                                            Err(e) => {
1014                                                error!(
1015                                                    "Failed to individually acknowledge message {stream_id}: {e}"
1016                                                );
1017                                            }
1018                                        }
1019                                    }
1020                                }
1021                            }
1022                        }
1023                    }
1024                    Err(e) => {
1025                        // Check if it's a connection error
1026                        if is_connection_error(&e) {
1027                            error!("Redis connection lost: {e}");
1028                            if let Some(ref tx) = *event_tx.read().await {
1029                                let _ = tx
1030                                    .send(ComponentEvent {
1031                                        component_id: source_id.clone(),
1032                                        component_type: ComponentType::Source,
1033                                        status: ComponentStatus::Running,
1034                                        timestamp: chrono::Utc::now(),
1035                                        message: Some(format!("Redis connection lost: {e}")),
1036                                    })
1037                                    .await;
1038                            }
1039
1040                            // Try to reconnect
1041                            match Self::connect_with_retry(
1042                                &platform_config.redis_url,
1043                                platform_config.max_retries,
1044                                platform_config.retry_delay_ms,
1045                            )
1046                            .await
1047                            {
1048                                Ok(new_conn) => {
1049                                    conn = new_conn;
1050                                    info!("Reconnected to Redis");
1051                                }
1052                                Err(e) => {
1053                                    error!("Failed to reconnect to Redis: {e}");
1054                                    *status.write().await = ComponentStatus::Stopped;
1055                                    return;
1056                                }
1057                            }
1058                        } else if !e.to_string().contains("timeout") {
1059                            // Log non-timeout errors
1060                            error!("Error reading from stream: {e}");
1061                        }
1062                    }
1063                }
1064            }
1065        })
1066    }
1067}
1068
1069#[async_trait::async_trait]
1070impl Source for PlatformSource {
1071    fn id(&self) -> &str {
1072        &self.base.id
1073    }
1074
1075    fn type_name(&self) -> &str {
1076        "platform"
1077    }
1078
1079    fn properties(&self) -> HashMap<String, serde_json::Value> {
1080        let mut props = HashMap::new();
1081        props.insert(
1082            "redis_url".to_string(),
1083            serde_json::Value::String(self.config.redis_url.clone()),
1084        );
1085        props.insert(
1086            "stream_key".to_string(),
1087            serde_json::Value::String(self.config.stream_key.clone()),
1088        );
1089        props.insert(
1090            "consumer_group".to_string(),
1091            serde_json::Value::String(self.config.consumer_group.clone()),
1092        );
1093        if let Some(ref consumer_name) = self.config.consumer_name {
1094            props.insert(
1095                "consumer_name".to_string(),
1096                serde_json::Value::String(consumer_name.clone()),
1097            );
1098        }
1099        props.insert(
1100            "batch_size".to_string(),
1101            serde_json::Value::Number(self.config.batch_size.into()),
1102        );
1103        props.insert(
1104            "block_ms".to_string(),
1105            serde_json::Value::Number(self.config.block_ms.into()),
1106        );
1107        props
1108    }
1109
1110    fn auto_start(&self) -> bool {
1111        self.base.get_auto_start()
1112    }
1113
1114    async fn start(&self) -> Result<()> {
1115        info!("Starting platform source: {}", self.base.id);
1116
1117        // Extract configuration from typed config
1118        let platform_config = PlatformConfig {
1119            redis_url: self.config.redis_url.clone(),
1120            stream_key: self.config.stream_key.clone(),
1121            consumer_group: self.config.consumer_group.clone(),
1122            consumer_name: self
1123                .config
1124                .consumer_name
1125                .clone()
1126                .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1127            batch_size: self.config.batch_size,
1128            block_ms: self.config.block_ms,
1129            start_id: ">".to_string(),
1130            always_create_consumer_group: false,
1131            max_retries: 5,
1132            retry_delay_ms: 1000,
1133        };
1134
1135        // Update status
1136        *self.base.status.write().await = ComponentStatus::Running;
1137
1138        // Start consumer task
1139        let task = Self::start_consumer_task(
1140            self.base.id.clone(),
1141            platform_config,
1142            self.base.dispatchers.clone(),
1143            self.base.status_tx(),
1144            self.base.status.clone(),
1145        )
1146        .await;
1147
1148        *self.base.task_handle.write().await = Some(task);
1149
1150        Ok(())
1151    }
1152
1153    async fn stop(&self) -> Result<()> {
1154        info!("Stopping platform source: {}", self.base.id);
1155
1156        // Cancel the task
1157        if let Some(handle) = self.base.task_handle.write().await.take() {
1158            handle.abort();
1159        }
1160
1161        // Update status
1162        *self.base.status.write().await = ComponentStatus::Stopped;
1163
1164        Ok(())
1165    }
1166
1167    async fn status(&self) -> ComponentStatus {
1168        self.base.status.read().await.clone()
1169    }
1170
1171    async fn subscribe(
1172        &self,
1173        settings: drasi_lib::config::SourceSubscriptionSettings,
1174    ) -> Result<SubscriptionResponse> {
1175        self.base
1176            .subscribe_with_bootstrap(&settings, "Platform")
1177            .await
1178    }
1179
1180    fn as_any(&self) -> &dyn std::any::Any {
1181        self
1182    }
1183
1184    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1185        self.base.initialize(context).await;
1186    }
1187
1188    async fn set_bootstrap_provider(
1189        &self,
1190        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1191    ) {
1192        self.base.set_bootstrap_provider(provider).await;
1193    }
1194}
1195
1196impl PlatformSource {
1197    /// Create a test subscription to this source (synchronous)
1198    ///
1199    /// This method delegates to SourceBase and is provided for convenience in tests.
1200    /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1201    pub fn test_subscribe(
1202        &self,
1203    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1204        self.base.test_subscribe()
1205    }
1206
1207    /// Create a test subscription to this source (async)
1208    ///
1209    /// This method delegates to SourceBase and is provided for convenience in async tests.
1210    /// Prefer this method over test_subscribe() in async contexts.
1211    pub async fn test_subscribe_async(
1212        &self,
1213    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1214        self.base
1215            .create_streaming_receiver()
1216            .await
1217            .expect("Failed to create test subscription")
1218    }
1219}
1220
1221/// Extract event data from Redis stream entry
1222fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1223    // Look for common field names
1224    for key in &["data", "event", "payload", "message"] {
1225        if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1226            return String::from_utf8(data.clone())
1227                .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1228        }
1229    }
1230
1231    Err(anyhow::anyhow!(
1232        "No event data found in stream entry. Available keys: {:?}",
1233        entry_map.keys().collect::<Vec<_>>()
1234    ))
1235}
1236
1237/// Check if error is a connection error
1238fn is_connection_error(e: &redis::RedisError) -> bool {
1239    e.is_connection_dropped()
1240        || e.is_io_error()
1241        || e.to_string().contains("connection")
1242        || e.to_string().contains("EOF")
1243}
1244
1245/// Message type discriminator
1246#[derive(Debug, Clone, PartialEq)]
1247enum MessageType {
1248    /// Control message with control type from source.table
1249    Control(String),
1250    /// Data message (node/relation change)
1251    Data,
1252}
1253
1254/// Detect message type based on payload.source.db field
1255///
1256/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1257/// Returns MessageType::Data for all other cases
1258fn detect_message_type(cloud_event: &Value) -> MessageType {
1259    // Extract data array and get first event to check message type
1260    let data_array = match cloud_event["data"].as_array() {
1261        Some(arr) if !arr.is_empty() => arr,
1262        _ => return MessageType::Data, // Default to data if no data array
1263    };
1264
1265    // Check the first event's source.db field
1266    let first_event = &data_array[0];
1267    let source_db = first_event["payload"]["source"]["db"]
1268        .as_str()
1269        .unwrap_or("");
1270
1271    // Case-insensitive comparison with "Drasi"
1272    if source_db.eq_ignore_ascii_case("drasi") {
1273        // Extract source.table to determine control type
1274        let control_type = first_event["payload"]["source"]["table"]
1275            .as_str()
1276            .unwrap_or("Unknown")
1277            .to_string();
1278        MessageType::Control(control_type)
1279    } else {
1280        MessageType::Data
1281    }
1282}
1283
1284/// Helper struct to hold SourceChange along with reactivator timestamps
1285#[derive(Debug)]
1286struct SourceChangeWithTimestamps {
1287    source_change: SourceChange,
1288    reactivator_start_ns: Option<u64>,
1289    reactivator_end_ns: Option<u64>,
1290}
1291
1292/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1293///
1294/// Handles events in CloudEvent format with a data array containing change events.
1295/// Each event in the data array has: op, payload.after/before, payload.source
1296fn transform_platform_event(
1297    cloud_event: Value,
1298    source_id: &str,
1299) -> Result<Vec<SourceChangeWithTimestamps>> {
1300    let mut source_changes = Vec::new();
1301
1302    // Extract the data array from CloudEvent wrapper
1303    let data_array = cloud_event["data"]
1304        .as_array()
1305        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1306
1307    // Process each event in the data array
1308    for event in data_array {
1309        // Extract reactivator timestamps from top-level event fields
1310        let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1311        let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1312
1313        // Extract operation type (op field: "i", "u", "d")
1314        let op = event["op"]
1315            .as_str()
1316            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1317
1318        // Extract payload
1319        let payload = &event["payload"];
1320        if payload.is_null() {
1321            return Err(anyhow::anyhow!("Missing 'payload' field"));
1322        }
1323
1324        // Extract element type from payload.source.table
1325        let element_type = payload["source"]["table"]
1326            .as_str()
1327            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1328
1329        // Get element data based on operation
1330        let element_data = match op {
1331            "i" | "u" => &payload["after"],
1332            "d" => &payload["before"],
1333            _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1334        };
1335
1336        if element_data.is_null() {
1337            return Err(anyhow::anyhow!(
1338                "Missing element data (after/before) for operation {op}"
1339            ));
1340        }
1341
1342        // Extract element ID
1343        let element_id = element_data["id"]
1344            .as_str()
1345            .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1346
1347        // Extract labels
1348        let labels_array = element_data["labels"]
1349            .as_array()
1350            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1351
1352        let labels: Vec<Arc<str>> = labels_array
1353            .iter()
1354            .filter_map(|v| v.as_str().map(Arc::from))
1355            .collect();
1356
1357        if labels.is_empty() {
1358            return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1359        }
1360
1361        // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1362        let ts_ns = payload["source"]["ts_ns"]
1363            .as_u64()
1364            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1365        let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1366
1367        // Build ElementMetadata
1368        let reference = ElementReference::new(source_id, element_id);
1369        let metadata = ElementMetadata {
1370            reference,
1371            labels: labels.into(),
1372            effective_from,
1373        };
1374
1375        // Handle delete operation (no properties needed)
1376        if op == "d" {
1377            source_changes.push(SourceChangeWithTimestamps {
1378                source_change: SourceChange::Delete { metadata },
1379                reactivator_start_ns,
1380                reactivator_end_ns,
1381            });
1382            continue;
1383        }
1384
1385        // Convert properties
1386        let properties_obj = element_data["properties"]
1387            .as_object()
1388            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1389
1390        let properties = convert_json_to_element_properties(properties_obj)?;
1391
1392        // Build element based on type
1393        let element = match element_type {
1394            "node" => Element::Node {
1395                metadata,
1396                properties,
1397            },
1398            "rel" | "relation" => {
1399                // Extract startId and endId
1400                let start_id = element_data["startId"]
1401                    .as_str()
1402                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1403                let end_id = element_data["endId"]
1404                    .as_str()
1405                    .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1406
1407                Element::Relation {
1408                    metadata,
1409                    properties,
1410                    out_node: ElementReference::new(source_id, start_id),
1411                    in_node: ElementReference::new(source_id, end_id),
1412                }
1413            }
1414            _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1415        };
1416
1417        // Build SourceChange
1418        let source_change = match op {
1419            "i" => SourceChange::Insert { element },
1420            "u" => SourceChange::Update { element },
1421            _ => unreachable!(),
1422        };
1423
1424        source_changes.push(SourceChangeWithTimestamps {
1425            source_change,
1426            reactivator_start_ns,
1427            reactivator_end_ns,
1428        });
1429    }
1430
1431    Ok(source_changes)
1432}
1433
1434/// Transform CloudEvent-wrapped control event to SourceControl(s)
1435///
1436/// Handles control messages from Query API service with source.db = "Drasi"
1437/// Currently supports "SourceSubscription" control type
1438fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1439    let mut control_events = Vec::new();
1440
1441    // Extract the data array from CloudEvent wrapper
1442    let data_array = cloud_event["data"]
1443        .as_array()
1444        .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1445
1446    // Check if control type is supported
1447    if control_type != "SourceSubscription" {
1448        info!(
1449            "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1450        );
1451        return Ok(control_events); // Return empty vec for unknown types
1452    }
1453
1454    // Process each event in the data array
1455    for event in data_array {
1456        // Extract operation type (op field: "i", "u", "d")
1457        let op = event["op"]
1458            .as_str()
1459            .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1460
1461        // Extract payload
1462        let payload = &event["payload"];
1463        if payload.is_null() {
1464            warn!("Missing 'payload' field in control event, skipping");
1465            continue;
1466        }
1467
1468        // Get data based on operation
1469        let control_data = match op {
1470            "i" | "u" => &payload["after"],
1471            "d" => &payload["before"],
1472            _ => {
1473                warn!("Unknown operation type in control event: {op}, skipping");
1474                continue;
1475            }
1476        };
1477
1478        if control_data.is_null() {
1479            warn!("Missing control data (after/before) for operation {op}, skipping");
1480            continue;
1481        }
1482
1483        // Extract required fields for SourceSubscription
1484        let query_id = match control_data["queryId"].as_str() {
1485            Some(id) => id.to_string(),
1486            None => {
1487                warn!("Missing required 'queryId' field in control event, skipping");
1488                continue;
1489            }
1490        };
1491
1492        let query_node_id = match control_data["queryNodeId"].as_str() {
1493            Some(id) => id.to_string(),
1494            None => {
1495                warn!("Missing required 'queryNodeId' field in control event, skipping");
1496                continue;
1497            }
1498        };
1499
1500        // Extract optional label arrays (default to empty if missing)
1501        let node_labels = control_data["nodeLabels"]
1502            .as_array()
1503            .map(|arr| {
1504                arr.iter()
1505                    .filter_map(|v| v.as_str().map(String::from))
1506                    .collect()
1507            })
1508            .unwrap_or_default();
1509
1510        let rel_labels = control_data["relLabels"]
1511            .as_array()
1512            .map(|arr| {
1513                arr.iter()
1514                    .filter_map(|v| v.as_str().map(String::from))
1515                    .collect()
1516            })
1517            .unwrap_or_default();
1518
1519        // Map operation to ControlOperation
1520        let operation = match op {
1521            "i" => ControlOperation::Insert,
1522            "u" => ControlOperation::Update,
1523            "d" => ControlOperation::Delete,
1524            _ => unreachable!(), // Already filtered above
1525        };
1526
1527        // Build SourceControl::Subscription
1528        let control_event = SourceControl::Subscription {
1529            query_id,
1530            query_node_id,
1531            node_labels,
1532            rel_labels,
1533            operation,
1534        };
1535
1536        control_events.push(control_event);
1537    }
1538
1539    Ok(control_events)
1540}