drasi_source_platform/lib.rs
1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Platform Source Plugin for Drasi
17//!
18//! This plugin consumes data change events from Redis Streams, which is the primary
19//! integration point for the Drasi platform. It supports CloudEvent-wrapped messages
20//! containing node and relation changes, as well as control events for query subscriptions.
21//!
22//! # Architecture
23//!
24//! The platform source connects to Redis as a consumer group member, enabling:
25//! - **At-least-once delivery**: Messages are acknowledged after processing
26//! - **Horizontal scaling**: Multiple consumers can share the workload
27//! - **Fault tolerance**: Unacknowledged messages are redelivered
28//!
29//! # Configuration
30//!
31//! | Field | Type | Default | Description |
32//! |-------|------|---------|-------------|
33//! | `redis_url` | string | *required* | Redis connection URL (e.g., `redis://localhost:6379`) |
34//! | `stream_key` | string | *required* | Redis stream key to consume from |
35//! | `consumer_group` | string | `"drasi-core"` | Consumer group name |
36//! | `consumer_name` | string | auto-generated | Unique consumer name within the group |
37//! | `batch_size` | usize | `100` | Number of events to read per XREADGROUP call |
38//! | `block_ms` | u64 | `5000` | Milliseconds to block waiting for events |
39//!
40//! # Data Format
41//!
42//! The platform source expects CloudEvent-wrapped messages with a `data` array
43//! containing change events. Each event includes an operation type and payload.
44//!
45//! ## Node Insert
46//!
47//! ```json
48//! {
49//! "data": [{
50//! "op": "i",
51//! "payload": {
52//! "after": {
53//! "id": "user-123",
54//! "labels": ["User"],
55//! "properties": {
56//! "name": "Alice",
57//! "email": "alice@example.com"
58//! }
59//! },
60//! "source": {
61//! "db": "mydb",
62//! "table": "node",
63//! "ts_ns": 1699900000000000000
64//! }
65//! }
66//! }]
67//! }
68//! ```
69//!
70//! ## Node Update
71//!
72//! ```json
73//! {
74//! "data": [{
75//! "op": "u",
76//! "payload": {
77//! "after": {
78//! "id": "user-123",
79//! "labels": ["User"],
80//! "properties": { "name": "Alice Updated" }
81//! },
82//! "source": { "table": "node", "ts_ns": 1699900001000000000 }
83//! }
84//! }]
85//! }
86//! ```
87//!
88//! ## Node Delete
89//!
90//! ```json
91//! {
92//! "data": [{
93//! "op": "d",
94//! "payload": {
95//! "before": {
96//! "id": "user-123",
97//! "labels": ["User"],
98//! "properties": {}
99//! },
100//! "source": { "table": "node", "ts_ns": 1699900002000000000 }
101//! }
102//! }]
103//! }
104//! ```
105//!
106//! ## Relation Insert
107//!
108//! ```json
109//! {
110//! "data": [{
111//! "op": "i",
112//! "payload": {
113//! "after": {
114//! "id": "follows-1",
115//! "labels": ["FOLLOWS"],
116//! "startId": "user-123",
117//! "endId": "user-456",
118//! "properties": { "since": "2024-01-01" }
119//! },
120//! "source": { "table": "rel", "ts_ns": 1699900003000000000 }
121//! }
122//! }]
123//! }
124//! ```
125//!
126//! # Control Events
127//!
128//! Control events are identified by `payload.source.db = "Drasi"` (case-insensitive).
129//! Currently supported control types:
130//!
131//! - **SourceSubscription**: Query subscription management
132//!
133//! # Example Configuration (YAML)
134//!
135//! ```yaml
136//! source_type: platform
137//! properties:
138//! redis_url: "redis://localhost:6379"
139//! stream_key: "my-app-changes"
140//! consumer_group: "drasi-consumers"
141//! batch_size: 50
142//! block_ms: 10000
143//! ```
144//!
145//! # Usage Example
146//!
147//! ```rust,ignore
148//! use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
149//! use std::sync::Arc;
150//!
151//! let config = PlatformSourceBuilder::new()
152//! .with_redis_url("redis://localhost:6379")
153//! .with_stream_key("my-changes")
154//! .with_consumer_group("my-consumers")
155//! .build();
156//!
157//! let source = Arc::new(PlatformSource::new("platform-source", config)?);
158//! drasi.add_source(source).await?;
159//! ```
160
161pub mod config;
162pub mod descriptor;
163pub use config::PlatformSourceConfig;
164
165use anyhow::Result;
166use log::{debug, error, info, warn};
167use redis::streams::StreamReadReply;
168use serde_json::Value;
169use std::collections::HashMap;
170use std::sync::Arc;
171use std::time::Duration;
172use tokio::sync::RwLock;
173use tokio::task::JoinHandle;
174
175use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
176use drasi_lib::channels::{
177 ComponentEvent, ComponentEventSender, ComponentStatus, ComponentType, ControlOperation,
178 DispatchMode, SourceControl, SourceEvent, SourceEventWrapper, SubscriptionResponse,
179};
180use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
181use drasi_lib::sources::manager::convert_json_to_element_properties;
182use drasi_lib::Source;
183use tracing::Instrument;
184
185#[cfg(test)]
186mod tests;
187
188/// Configuration for the platform source
189#[derive(Debug, Clone)]
190struct PlatformConfig {
191 /// Redis connection URL
192 redis_url: String,
193 /// Redis stream key to read from
194 stream_key: String,
195 /// Consumer group name
196 consumer_group: String,
197 /// Consumer name (should be unique per instance)
198 consumer_name: String,
199 /// Number of events to read per XREADGROUP call
200 batch_size: usize,
201 /// Milliseconds to block waiting for events
202 block_ms: u64,
203 /// Stream position to start from (">" for new, "0" for all)
204 start_id: String,
205 /// Always recreate consumer group on startup (default: false)
206 /// If true, deletes and recreates the consumer group using start_id
207 /// If false, uses existing group position (ignores start_id if group exists)
208 always_create_consumer_group: bool,
209 /// Maximum connection retry attempts
210 max_retries: usize,
211 /// Delay between retries in milliseconds
212 retry_delay_ms: u64,
213}
214
215impl Default for PlatformConfig {
216 fn default() -> Self {
217 Self {
218 redis_url: String::new(),
219 stream_key: String::new(),
220 consumer_group: String::new(),
221 consumer_name: String::new(),
222 batch_size: 10,
223 block_ms: 5000,
224 start_id: ">".to_string(),
225 always_create_consumer_group: false,
226 max_retries: 3,
227 retry_delay_ms: 1000,
228 }
229 }
230}
231
232/// Platform source that reads events from Redis Streams.
233///
234/// This source connects to a Redis instance and consumes CloudEvent-wrapped
235/// messages from a stream using consumer groups. It supports both data events
236/// (node/relation changes) and control events (query subscriptions).
237///
238/// # Fields
239///
240/// - `base`: Common source functionality (dispatchers, status, lifecycle)
241/// - `config`: Platform-specific configuration (Redis connection, stream settings)
242pub struct PlatformSource {
243 /// Base source implementation providing common functionality
244 base: SourceBase,
245 /// Platform source configuration
246 config: PlatformSourceConfig,
247}
248
249/// Builder for creating [`PlatformSource`] instances.
250///
251/// Provides a fluent API for constructing platform sources
252/// with sensible defaults.
253///
254/// # Example
255///
256/// ```rust,ignore
257/// use drasi_source_platform::PlatformSource;
258///
259/// let source = PlatformSource::builder("my-platform-source")
260/// .with_redis_url("redis://localhost:6379")
261/// .with_stream_key("my-app-changes")
262/// .with_consumer_group("my-consumers")
263/// .with_batch_size(50)
264/// .build()?;
265/// ```
266pub struct PlatformSourceBuilder {
267 id: String,
268 redis_url: String,
269 stream_key: String,
270 consumer_group: Option<String>,
271 consumer_name: Option<String>,
272 batch_size: Option<usize>,
273 block_ms: Option<u64>,
274 dispatch_mode: Option<DispatchMode>,
275 dispatch_buffer_capacity: Option<usize>,
276 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
277 auto_start: bool,
278}
279
280impl PlatformSourceBuilder {
281 /// Create a new builder with the given ID and default values.
282 pub fn new(id: impl Into<String>) -> Self {
283 Self {
284 id: id.into(),
285 redis_url: String::new(),
286 stream_key: String::new(),
287 consumer_group: None,
288 consumer_name: None,
289 batch_size: None,
290 block_ms: None,
291 dispatch_mode: None,
292 dispatch_buffer_capacity: None,
293 bootstrap_provider: None,
294 auto_start: true,
295 }
296 }
297
298 /// Set the Redis connection URL.
299 ///
300 /// # Arguments
301 ///
302 /// * `url` - Redis connection URL (e.g., `redis://localhost:6379`)
303 pub fn with_redis_url(mut self, url: impl Into<String>) -> Self {
304 self.redis_url = url.into();
305 self
306 }
307
308 /// Set the Redis stream key to consume from.
309 ///
310 /// # Arguments
311 ///
312 /// * `key` - Name of the Redis stream
313 pub fn with_stream_key(mut self, key: impl Into<String>) -> Self {
314 self.stream_key = key.into();
315 self
316 }
317
318 /// Set the consumer group name.
319 ///
320 /// # Arguments
321 ///
322 /// * `group` - Consumer group name (default: `"drasi-core"`)
323 pub fn with_consumer_group(mut self, group: impl Into<String>) -> Self {
324 self.consumer_group = Some(group.into());
325 self
326 }
327
328 /// Set a unique consumer name within the group.
329 ///
330 /// # Arguments
331 ///
332 /// * `name` - Unique consumer name (auto-generated if not specified)
333 pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
334 self.consumer_name = Some(name.into());
335 self
336 }
337
338 /// Set the batch size for reading events.
339 ///
340 /// # Arguments
341 ///
342 /// * `size` - Number of events to read per XREADGROUP call (default: 100)
343 pub fn with_batch_size(mut self, size: usize) -> Self {
344 self.batch_size = Some(size);
345 self
346 }
347
348 /// Set the block timeout for waiting on events.
349 ///
350 /// # Arguments
351 ///
352 /// * `ms` - Milliseconds to block waiting for events (default: 5000)
353 pub fn with_block_ms(mut self, ms: u64) -> Self {
354 self.block_ms = Some(ms);
355 self
356 }
357
358 /// Set the dispatch mode for this source
359 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
360 self.dispatch_mode = Some(mode);
361 self
362 }
363
364 /// Set the dispatch buffer capacity for this source
365 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
366 self.dispatch_buffer_capacity = Some(capacity);
367 self
368 }
369
370 /// Set the bootstrap provider for this source
371 pub fn with_bootstrap_provider(
372 mut self,
373 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
374 ) -> Self {
375 self.bootstrap_provider = Some(Box::new(provider));
376 self
377 }
378
379 /// Set whether this source should auto-start when DrasiLib starts.
380 ///
381 /// Default is `true`. Set to `false` if this source should only be
382 /// started manually via `start_source()`.
383 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
384 self.auto_start = auto_start;
385 self
386 }
387
388 /// Set the full configuration at once
389 pub fn with_config(mut self, config: PlatformSourceConfig) -> Self {
390 self.redis_url = config.redis_url;
391 self.stream_key = config.stream_key;
392 self.consumer_group = Some(config.consumer_group);
393 self.consumer_name = config.consumer_name;
394 self.batch_size = Some(config.batch_size);
395 self.block_ms = Some(config.block_ms);
396 self
397 }
398
399 /// Build the platform source.
400 ///
401 /// # Errors
402 ///
403 /// Returns an error if the source cannot be constructed.
404 pub fn build(self) -> Result<PlatformSource> {
405 let config = PlatformSourceConfig {
406 redis_url: self.redis_url,
407 stream_key: self.stream_key,
408 consumer_group: self
409 .consumer_group
410 .unwrap_or_else(|| "drasi-core".to_string()),
411 consumer_name: self.consumer_name,
412 batch_size: self.batch_size.unwrap_or(100),
413 block_ms: self.block_ms.unwrap_or(5000),
414 };
415
416 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
417 if let Some(mode) = self.dispatch_mode {
418 params = params.with_dispatch_mode(mode);
419 }
420 if let Some(capacity) = self.dispatch_buffer_capacity {
421 params = params.with_dispatch_buffer_capacity(capacity);
422 }
423 if let Some(provider) = self.bootstrap_provider {
424 params = params.with_bootstrap_provider(provider);
425 }
426
427 Ok(PlatformSource {
428 base: SourceBase::new(params)?,
429 config,
430 })
431 }
432}
433
434impl PlatformSource {
435 /// Create a builder for PlatformSource
436 ///
437 /// # Example
438 ///
439 /// ```rust,ignore
440 /// use drasi_source_platform::PlatformSource;
441 ///
442 /// let source = PlatformSource::builder("my-platform-source")
443 /// .with_redis_url("redis://localhost:6379")
444 /// .with_stream_key("my-changes")
445 /// .with_bootstrap_provider(my_provider)
446 /// .build()?;
447 /// ```
448 pub fn builder(id: impl Into<String>) -> PlatformSourceBuilder {
449 PlatformSourceBuilder::new(id)
450 }
451
452 /// Create a new platform source.
453 ///
454 /// The event channel is automatically injected when the source is added
455 /// to DrasiLib via `add_source()`.
456 ///
457 /// # Arguments
458 ///
459 /// * `id` - Unique identifier for this source instance
460 /// * `config` - Platform source configuration
461 ///
462 /// # Returns
463 ///
464 /// A new `PlatformSource` instance, or an error if construction fails.
465 ///
466 /// # Errors
467 ///
468 /// Returns an error if the base source cannot be initialized.
469 ///
470 /// # Example
471 ///
472 /// ```rust,ignore
473 /// use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
474 ///
475 /// let config = PlatformSourceBuilder::new("my-platform-source")
476 /// .with_redis_url("redis://localhost:6379")
477 /// .with_stream_key("my-changes")
478 /// .build()?;
479 /// ```
480 pub fn new(id: impl Into<String>, config: PlatformSourceConfig) -> Result<Self> {
481 let id = id.into();
482 let params = SourceBaseParams::new(id);
483 Ok(Self {
484 base: SourceBase::new(params)?,
485 config,
486 })
487 }
488
489 /// Subscribe to source events (for testing)
490 ///
491 /// This method is intended for use in tests to receive events broadcast by the source.
492 /// In production, queries subscribe to sources through the SourceManager.
493 /// Parse configuration from properties
494 #[allow(dead_code)]
495 fn parse_config(properties: &HashMap<String, Value>) -> Result<PlatformConfig> {
496 // Extract required fields first
497 let redis_url = properties
498 .get("redis_url")
499 .and_then(|v| v.as_str())
500 .ok_or_else(|| {
501 anyhow::anyhow!(
502 "Configuration error: Missing required field 'redis_url'. \
503 Platform source requires a Redis connection URL"
504 )
505 })?
506 .to_string();
507
508 let stream_key = properties
509 .get("stream_key")
510 .and_then(|v| v.as_str())
511 .ok_or_else(|| {
512 anyhow::anyhow!(
513 "Configuration error: Missing required field 'stream_key'. \
514 Platform source requires a Redis Stream key to read from"
515 )
516 })?
517 .to_string();
518
519 let consumer_group = properties
520 .get("consumer_group")
521 .and_then(|v| v.as_str())
522 .ok_or_else(|| {
523 anyhow::anyhow!(
524 "Configuration error: Missing required field 'consumer_group'. \
525 Platform source requires a consumer group name"
526 )
527 })?
528 .to_string();
529
530 let consumer_name = properties
531 .get("consumer_name")
532 .and_then(|v| v.as_str())
533 .ok_or_else(|| {
534 anyhow::anyhow!(
535 "Configuration error: Missing required field 'consumer_name'. \
536 Platform source requires a unique consumer name"
537 )
538 })?
539 .to_string();
540
541 // Get defaults for optional field handling
542 let defaults = PlatformConfig::default();
543
544 // Build config with optional fields
545 let config = PlatformConfig {
546 redis_url,
547 stream_key,
548 consumer_group,
549 consumer_name,
550 batch_size: properties
551 .get("batch_size")
552 .and_then(|v| v.as_u64())
553 .map(|v| v as usize)
554 .unwrap_or(defaults.batch_size),
555 block_ms: properties
556 .get("block_ms")
557 .and_then(|v| v.as_u64())
558 .unwrap_or(defaults.block_ms),
559 start_id: properties
560 .get("start_id")
561 .and_then(|v| v.as_str())
562 .map(|s| s.to_string())
563 .unwrap_or(defaults.start_id),
564 always_create_consumer_group: properties
565 .get("always_create_consumer_group")
566 .and_then(|v| v.as_bool())
567 .unwrap_or(defaults.always_create_consumer_group),
568 max_retries: properties
569 .get("max_retries")
570 .and_then(|v| v.as_u64())
571 .map(|v| v as usize)
572 .unwrap_or(defaults.max_retries),
573 retry_delay_ms: properties
574 .get("retry_delay_ms")
575 .and_then(|v| v.as_u64())
576 .unwrap_or(defaults.retry_delay_ms),
577 };
578
579 // Validate
580 if config.redis_url.is_empty() {
581 return Err(anyhow::anyhow!(
582 "Validation error: redis_url cannot be empty. \
583 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
584 ));
585 }
586 if config.stream_key.is_empty() {
587 return Err(anyhow::anyhow!(
588 "Validation error: stream_key cannot be empty. \
589 Please specify the Redis Stream key to read from"
590 ));
591 }
592 if config.consumer_group.is_empty() {
593 return Err(anyhow::anyhow!(
594 "Validation error: consumer_group cannot be empty. \
595 Please specify a consumer group name for this source"
596 ));
597 }
598 if config.consumer_name.is_empty() {
599 return Err(anyhow::anyhow!(
600 "Validation error: consumer_name cannot be empty. \
601 Please specify a unique consumer name within the consumer group"
602 ));
603 }
604
605 Ok(config)
606 }
607
608 /// Connect to Redis with retry logic
609 async fn connect_with_retry(
610 redis_url: &str,
611 max_retries: usize,
612 retry_delay_ms: u64,
613 ) -> Result<redis::aio::MultiplexedConnection> {
614 let client = redis::Client::open(redis_url)?;
615 let mut delay = retry_delay_ms;
616
617 for attempt in 0..max_retries {
618 match client.get_multiplexed_async_connection().await {
619 Ok(conn) => {
620 info!("Successfully connected to Redis");
621 return Ok(conn);
622 }
623 Err(e) if attempt < max_retries - 1 => {
624 warn!(
625 "Redis connection failed (attempt {}/{}): {}",
626 attempt + 1,
627 max_retries,
628 e
629 );
630 tokio::time::sleep(Duration::from_millis(delay)).await;
631 delay *= 2; // Exponential backoff
632 }
633 Err(e) => {
634 return Err(anyhow::anyhow!(
635 "Failed to connect to Redis after {max_retries} attempts: {e}"
636 ));
637 }
638 }
639 }
640
641 unreachable!()
642 }
643
644 /// Create or recreate consumer group based on configuration
645 async fn create_consumer_group(
646 conn: &mut redis::aio::MultiplexedConnection,
647 stream_key: &str,
648 consumer_group: &str,
649 start_id: &str,
650 always_create: bool,
651 ) -> Result<()> {
652 // Determine the initial position for the consumer group
653 let group_start_id = if start_id == ">" {
654 "$" // ">" means only new messages, so create group at end
655 } else {
656 start_id // "0" or specific ID
657 };
658
659 // If always_create is true, delete the existing group first
660 if always_create {
661 info!(
662 "always_create_consumer_group=true, deleting consumer group '{consumer_group}' if it exists"
663 );
664
665 let destroy_result: Result<i64, redis::RedisError> = redis::cmd("XGROUP")
666 .arg("DESTROY")
667 .arg(stream_key)
668 .arg(consumer_group)
669 .query_async(conn)
670 .await;
671
672 match destroy_result {
673 Ok(1) => info!("Successfully deleted consumer group '{consumer_group}'"),
674 Ok(0) => debug!("Consumer group '{consumer_group}' did not exist"),
675 Ok(n) => warn!("Unexpected result from XGROUP DESTROY: {n}"),
676 Err(e) => warn!("Error deleting consumer group (will continue): {e}"),
677 }
678 }
679
680 // Try to create the consumer group
681 let result: Result<String, redis::RedisError> = redis::cmd("XGROUP")
682 .arg("CREATE")
683 .arg(stream_key)
684 .arg(consumer_group)
685 .arg(group_start_id)
686 .arg("MKSTREAM")
687 .query_async(conn)
688 .await;
689
690 match result {
691 Ok(_) => {
692 info!(
693 "Created consumer group '{consumer_group}' for stream '{stream_key}' at position '{group_start_id}'"
694 );
695 Ok(())
696 }
697 Err(e) => {
698 // BUSYGROUP error means the group already exists
699 if e.to_string().contains("BUSYGROUP") {
700 info!(
701 "Consumer group '{consumer_group}' already exists for stream '{stream_key}', will resume from last position"
702 );
703 Ok(())
704 } else {
705 Err(anyhow::anyhow!("Failed to create consumer group: {e}"))
706 }
707 }
708 }
709 }
710
711 /// Start the stream consumer task
712 async fn start_consumer_task(
713 source_id: String,
714 instance_id: String,
715 platform_config: PlatformConfig,
716 dispatchers: Arc<
717 RwLock<
718 Vec<
719 Box<
720 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
721 >,
722 >,
723 >,
724 >,
725 event_tx: Arc<RwLock<Option<ComponentEventSender>>>,
726 status: Arc<RwLock<ComponentStatus>>,
727 ) -> JoinHandle<()> {
728 let source_id_for_span = source_id.clone();
729 let span = tracing::info_span!(
730 "platform_source_consumer",
731 instance_id = %instance_id,
732 component_id = %source_id_for_span,
733 component_type = "source"
734 );
735 tokio::spawn(async move {
736 info!(
737 "Starting platform source consumer for source '{}' on stream '{}'",
738 source_id, platform_config.stream_key
739 );
740
741 // Connect to Redis
742 let mut conn = match Self::connect_with_retry(
743 &platform_config.redis_url,
744 platform_config.max_retries,
745 platform_config.retry_delay_ms,
746 )
747 .await
748 {
749 Ok(conn) => conn,
750 Err(e) => {
751 error!("Failed to connect to Redis: {e}");
752 if let Some(ref tx) = *event_tx.read().await {
753 let _ = tx
754 .send(ComponentEvent {
755 component_id: source_id.clone(),
756 component_type: ComponentType::Source,
757 status: ComponentStatus::Stopped,
758 timestamp: chrono::Utc::now(),
759 message: Some(format!("Failed to connect to Redis: {e}")),
760 })
761 .await;
762 }
763 *status.write().await = ComponentStatus::Stopped;
764 return;
765 }
766 };
767
768 // Create consumer group
769 if let Err(e) = Self::create_consumer_group(
770 &mut conn,
771 &platform_config.stream_key,
772 &platform_config.consumer_group,
773 &platform_config.start_id,
774 platform_config.always_create_consumer_group,
775 )
776 .await
777 {
778 error!("Failed to create consumer group: {e}");
779 if let Some(ref tx) = *event_tx.read().await {
780 let _ = tx
781 .send(ComponentEvent {
782 component_id: source_id.clone(),
783 component_type: ComponentType::Source,
784 status: ComponentStatus::Stopped,
785 timestamp: chrono::Utc::now(),
786 message: Some(format!("Failed to create consumer group: {e}")),
787 })
788 .await;
789 }
790 *status.write().await = ComponentStatus::Stopped;
791 return;
792 }
793
794 // Main consumer loop
795 loop {
796 // Read from stream using ">" to get next undelivered messages for this consumer group
797 let read_result: Result<StreamReadReply, redis::RedisError> =
798 redis::cmd("XREADGROUP")
799 .arg("GROUP")
800 .arg(&platform_config.consumer_group)
801 .arg(&platform_config.consumer_name)
802 .arg("COUNT")
803 .arg(platform_config.batch_size)
804 .arg("BLOCK")
805 .arg(platform_config.block_ms)
806 .arg("STREAMS")
807 .arg(&platform_config.stream_key)
808 .arg(">") // Always use ">" for consumer group reads
809 .query_async(&mut conn)
810 .await;
811
812 match read_result {
813 Ok(reply) => {
814 // Collect all stream IDs for batch acknowledgment
815 let mut all_stream_ids = Vec::new();
816
817 // Process each stream entry
818 for stream_key in reply.keys {
819 for stream_id in stream_key.ids {
820 debug!("Received event from stream: {}", stream_id.id);
821
822 // Store stream ID for batch acknowledgment
823 all_stream_ids.push(stream_id.id.clone());
824
825 // Extract event data
826 match extract_event_data(&stream_id.map) {
827 Ok(event_json) => {
828 // Parse JSON
829 match serde_json::from_str::<Value>(&event_json) {
830 Ok(cloud_event) => {
831 // Detect message type
832 let message_type =
833 detect_message_type(&cloud_event);
834
835 match message_type {
836 MessageType::Control(control_type) => {
837 // Handle control message
838 debug!(
839 "Detected control message of type: {control_type}"
840 );
841
842 match transform_control_event(
843 cloud_event,
844 &control_type,
845 ) {
846 Ok(control_events) => {
847 // Publish control events
848 for control_event in control_events
849 {
850 // Create profiling metadata with timestamps
851 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
852 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
853
854 let wrapper = SourceEventWrapper::with_profiling(
855 source_id.clone(),
856 SourceEvent::Control(control_event),
857 chrono::Utc::now(),
858 profiling,
859 );
860
861 // Dispatch via helper
862 if let Err(e) = SourceBase::dispatch_from_task(
863 dispatchers.clone(),
864 wrapper,
865 &source_id,
866 )
867 .await
868 {
869 debug!("[{source_id}] Failed to dispatch control event (no subscribers): {e}");
870 } else {
871 debug!(
872 "Published control event for stream {}",
873 stream_id.id
874 );
875 }
876 }
877 }
878 Err(e) => {
879 warn!(
880 "Failed to transform control event {}: {}",
881 stream_id.id, e
882 );
883 }
884 }
885 }
886 MessageType::Data => {
887 // Handle data message
888 match transform_platform_event(
889 cloud_event,
890 &source_id,
891 ) {
892 Ok(source_changes_with_timestamps) => {
893 // Publish source changes
894 for item in
895 source_changes_with_timestamps
896 {
897 // Create profiling metadata with timestamps
898 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
899 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
900
901 // Extract source_ns from SourceChange transaction time
902 profiling.source_ns = Some(
903 item.source_change
904 .get_transaction_time(),
905 );
906
907 // Set reactivator timestamps from event
908 profiling
909 .reactivator_start_ns =
910 item.reactivator_start_ns;
911 profiling.reactivator_end_ns =
912 item.reactivator_end_ns;
913
914 let wrapper = SourceEventWrapper::with_profiling(
915 source_id.clone(),
916 SourceEvent::Change(item.source_change),
917 chrono::Utc::now(),
918 profiling,
919 );
920
921 // Dispatch via helper
922 if let Err(e) = SourceBase::dispatch_from_task(
923 dispatchers.clone(),
924 wrapper,
925 &source_id,
926 )
927 .await
928 {
929 debug!("[{source_id}] Failed to dispatch change (no subscribers): {e}");
930 } else {
931 debug!(
932 "Published source change for event {}",
933 stream_id.id
934 );
935 }
936 }
937 }
938 Err(e) => {
939 warn!(
940 "Failed to transform event {}: {}",
941 stream_id.id, e
942 );
943 if let Some(ref tx) =
944 *event_tx.read().await
945 {
946 let _ = tx
947 .send(ComponentEvent {
948 component_id: source_id.clone(),
949 component_type:
950 ComponentType::Source,
951 status: ComponentStatus::Running,
952 timestamp: chrono::Utc::now(),
953 message: Some(format!(
954 "Transformation error: {e}"
955 )),
956 })
957 .await;
958 }
959 }
960 }
961 }
962 }
963 }
964 Err(e) => {
965 warn!(
966 "Failed to parse JSON for event {}: {}",
967 stream_id.id, e
968 );
969 }
970 }
971 }
972 Err(e) => {
973 warn!(
974 "Failed to extract event data from {}: {}",
975 stream_id.id, e
976 );
977 }
978 }
979 }
980 }
981
982 // Batch acknowledge all messages at once
983 if !all_stream_ids.is_empty() {
984 debug!("Acknowledging batch of {} messages", all_stream_ids.len());
985
986 let mut cmd = redis::cmd("XACK");
987 cmd.arg(&platform_config.stream_key)
988 .arg(&platform_config.consumer_group);
989
990 // Add all stream IDs to the command
991 for stream_id in &all_stream_ids {
992 cmd.arg(stream_id);
993 }
994
995 match cmd.query_async::<_, i64>(&mut conn).await {
996 Ok(ack_count) => {
997 debug!("Successfully acknowledged {ack_count} messages");
998 if ack_count as usize != all_stream_ids.len() {
999 warn!(
1000 "Acknowledged {} messages but expected {}",
1001 ack_count,
1002 all_stream_ids.len()
1003 );
1004 }
1005 }
1006 Err(e) => {
1007 error!("Failed to acknowledge message batch: {e}");
1008
1009 // Fallback: Try acknowledging messages individually
1010 warn!("Falling back to individual acknowledgments");
1011 for stream_id in &all_stream_ids {
1012 match redis::cmd("XACK")
1013 .arg(&platform_config.stream_key)
1014 .arg(&platform_config.consumer_group)
1015 .arg(stream_id)
1016 .query_async::<_, i64>(&mut conn)
1017 .await
1018 {
1019 Ok(_) => {
1020 debug!(
1021 "Individually acknowledged message {stream_id}"
1022 );
1023 }
1024 Err(e) => {
1025 error!(
1026 "Failed to individually acknowledge message {stream_id}: {e}"
1027 );
1028 }
1029 }
1030 }
1031 }
1032 }
1033 }
1034 }
1035 Err(e) => {
1036 // Check if it's a connection error
1037 if is_connection_error(&e) {
1038 error!("Redis connection lost: {e}");
1039 if let Some(ref tx) = *event_tx.read().await {
1040 let _ = tx
1041 .send(ComponentEvent {
1042 component_id: source_id.clone(),
1043 component_type: ComponentType::Source,
1044 status: ComponentStatus::Running,
1045 timestamp: chrono::Utc::now(),
1046 message: Some(format!("Redis connection lost: {e}")),
1047 })
1048 .await;
1049 }
1050
1051 // Try to reconnect
1052 match Self::connect_with_retry(
1053 &platform_config.redis_url,
1054 platform_config.max_retries,
1055 platform_config.retry_delay_ms,
1056 )
1057 .await
1058 {
1059 Ok(new_conn) => {
1060 conn = new_conn;
1061 info!("Reconnected to Redis");
1062 }
1063 Err(e) => {
1064 error!("Failed to reconnect to Redis: {e}");
1065 *status.write().await = ComponentStatus::Stopped;
1066 return;
1067 }
1068 }
1069 } else if !e.to_string().contains("timeout") {
1070 // Log non-timeout errors
1071 error!("Error reading from stream: {e}");
1072 }
1073 }
1074 }
1075 }
1076 }.instrument(span))
1077 }
1078}
1079
1080#[async_trait::async_trait]
1081impl Source for PlatformSource {
1082 fn id(&self) -> &str {
1083 &self.base.id
1084 }
1085
1086 fn type_name(&self) -> &str {
1087 "platform"
1088 }
1089
1090 fn properties(&self) -> HashMap<String, serde_json::Value> {
1091 let mut props = HashMap::new();
1092 props.insert(
1093 "redis_url".to_string(),
1094 serde_json::Value::String(self.config.redis_url.clone()),
1095 );
1096 props.insert(
1097 "stream_key".to_string(),
1098 serde_json::Value::String(self.config.stream_key.clone()),
1099 );
1100 props.insert(
1101 "consumer_group".to_string(),
1102 serde_json::Value::String(self.config.consumer_group.clone()),
1103 );
1104 if let Some(ref consumer_name) = self.config.consumer_name {
1105 props.insert(
1106 "consumer_name".to_string(),
1107 serde_json::Value::String(consumer_name.clone()),
1108 );
1109 }
1110 props.insert(
1111 "batch_size".to_string(),
1112 serde_json::Value::Number(self.config.batch_size.into()),
1113 );
1114 props.insert(
1115 "block_ms".to_string(),
1116 serde_json::Value::Number(self.config.block_ms.into()),
1117 );
1118 props
1119 }
1120
1121 fn auto_start(&self) -> bool {
1122 self.base.get_auto_start()
1123 }
1124
1125 async fn start(&self) -> Result<()> {
1126 info!("Starting platform source: {}", self.base.id);
1127
1128 // Extract configuration from typed config
1129 let platform_config = PlatformConfig {
1130 redis_url: self.config.redis_url.clone(),
1131 stream_key: self.config.stream_key.clone(),
1132 consumer_group: self.config.consumer_group.clone(),
1133 consumer_name: self
1134 .config
1135 .consumer_name
1136 .clone()
1137 .unwrap_or_else(|| format!("drasi-consumer-{}", self.base.id)),
1138 batch_size: self.config.batch_size,
1139 block_ms: self.config.block_ms,
1140 start_id: ">".to_string(),
1141 always_create_consumer_group: false,
1142 max_retries: 5,
1143 retry_delay_ms: 1000,
1144 };
1145
1146 // Update status
1147 *self.base.status.write().await = ComponentStatus::Running;
1148
1149 // Get instance_id from context for log routing isolation
1150 let instance_id = self
1151 .base
1152 .context()
1153 .await
1154 .map(|c| c.instance_id)
1155 .unwrap_or_default();
1156
1157 // Start consumer task
1158 let task = Self::start_consumer_task(
1159 self.base.id.clone(),
1160 instance_id,
1161 platform_config,
1162 self.base.dispatchers.clone(),
1163 self.base.status_tx(),
1164 self.base.status.clone(),
1165 )
1166 .await;
1167
1168 *self.base.task_handle.write().await = Some(task);
1169
1170 Ok(())
1171 }
1172
1173 async fn stop(&self) -> Result<()> {
1174 info!("Stopping platform source: {}", self.base.id);
1175
1176 // Cancel the task
1177 if let Some(handle) = self.base.task_handle.write().await.take() {
1178 handle.abort();
1179 }
1180
1181 // Update status
1182 *self.base.status.write().await = ComponentStatus::Stopped;
1183
1184 Ok(())
1185 }
1186
1187 async fn status(&self) -> ComponentStatus {
1188 self.base.status.read().await.clone()
1189 }
1190
1191 async fn subscribe(
1192 &self,
1193 settings: drasi_lib::config::SourceSubscriptionSettings,
1194 ) -> Result<SubscriptionResponse> {
1195 self.base
1196 .subscribe_with_bootstrap(&settings, "Platform")
1197 .await
1198 }
1199
1200 fn as_any(&self) -> &dyn std::any::Any {
1201 self
1202 }
1203
1204 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1205 self.base.initialize(context).await;
1206 }
1207
1208 async fn set_bootstrap_provider(
1209 &self,
1210 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1211 ) {
1212 self.base.set_bootstrap_provider(provider).await;
1213 }
1214}
1215
1216impl PlatformSource {
1217 /// Create a test subscription to this source (synchronous)
1218 ///
1219 /// This method delegates to SourceBase and is provided for convenience in tests.
1220 /// Note: Use test_subscribe_async() in async contexts to avoid runtime issues.
1221 pub fn test_subscribe(
1222 &self,
1223 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1224 self.base.test_subscribe()
1225 }
1226
1227 /// Create a test subscription to this source (async)
1228 ///
1229 /// This method delegates to SourceBase and is provided for convenience in async tests.
1230 /// Prefer this method over test_subscribe() in async contexts.
1231 pub async fn test_subscribe_async(
1232 &self,
1233 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
1234 self.base
1235 .create_streaming_receiver()
1236 .await
1237 .expect("Failed to create test subscription")
1238 }
1239}
1240
1241/// Extract event data from Redis stream entry
1242fn extract_event_data(entry_map: &HashMap<String, redis::Value>) -> Result<String> {
1243 // Look for common field names
1244 for key in &["data", "event", "payload", "message"] {
1245 if let Some(redis::Value::Data(data)) = entry_map.get(*key) {
1246 return String::from_utf8(data.clone())
1247 .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in event data: {e}"));
1248 }
1249 }
1250
1251 Err(anyhow::anyhow!(
1252 "No event data found in stream entry. Available keys: {:?}",
1253 entry_map.keys().collect::<Vec<_>>()
1254 ))
1255}
1256
1257/// Check if error is a connection error
1258fn is_connection_error(e: &redis::RedisError) -> bool {
1259 e.is_connection_dropped()
1260 || e.is_io_error()
1261 || e.to_string().contains("connection")
1262 || e.to_string().contains("EOF")
1263}
1264
1265/// Message type discriminator
1266#[derive(Debug, Clone, PartialEq)]
1267enum MessageType {
1268 /// Control message with control type from source.table
1269 Control(String),
1270 /// Data message (node/relation change)
1271 Data,
1272}
1273
1274/// Detect message type based on payload.source.db field
1275///
1276/// Returns MessageType::Control with table name if source.db = "Drasi" (case-insensitive)
1277/// Returns MessageType::Data for all other cases
1278fn detect_message_type(cloud_event: &Value) -> MessageType {
1279 // Extract data array and get first event to check message type
1280 let data_array = match cloud_event["data"].as_array() {
1281 Some(arr) if !arr.is_empty() => arr,
1282 _ => return MessageType::Data, // Default to data if no data array
1283 };
1284
1285 // Check the first event's source.db field
1286 let first_event = &data_array[0];
1287 let source_db = first_event["payload"]["source"]["db"]
1288 .as_str()
1289 .unwrap_or("");
1290
1291 // Case-insensitive comparison with "Drasi"
1292 if source_db.eq_ignore_ascii_case("drasi") {
1293 // Extract source.table to determine control type
1294 let control_type = first_event["payload"]["source"]["table"]
1295 .as_str()
1296 .unwrap_or("Unknown")
1297 .to_string();
1298 MessageType::Control(control_type)
1299 } else {
1300 MessageType::Data
1301 }
1302}
1303
1304/// Helper struct to hold SourceChange along with reactivator timestamps
1305#[derive(Debug)]
1306struct SourceChangeWithTimestamps {
1307 source_change: SourceChange,
1308 reactivator_start_ns: Option<u64>,
1309 reactivator_end_ns: Option<u64>,
1310}
1311
1312/// Transform CloudEvent-wrapped platform event to drasi-core SourceChange(s)
1313///
1314/// Handles events in CloudEvent format with a data array containing change events.
1315/// Each event in the data array has: op, payload.after/before, payload.source
1316fn transform_platform_event(
1317 cloud_event: Value,
1318 source_id: &str,
1319) -> Result<Vec<SourceChangeWithTimestamps>> {
1320 let mut source_changes = Vec::new();
1321
1322 // Extract the data array from CloudEvent wrapper
1323 let data_array = cloud_event["data"]
1324 .as_array()
1325 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1326
1327 // Process each event in the data array
1328 for event in data_array {
1329 // Extract reactivator timestamps from top-level event fields
1330 let reactivator_start_ns = event["reactivatorStart_ns"].as_u64();
1331 let reactivator_end_ns = event["reactivatorEnd_ns"].as_u64();
1332
1333 // Extract operation type (op field: "i", "u", "d")
1334 let op = event["op"]
1335 .as_str()
1336 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field"))?;
1337
1338 // Extract payload
1339 let payload = &event["payload"];
1340 if payload.is_null() {
1341 return Err(anyhow::anyhow!("Missing 'payload' field"));
1342 }
1343
1344 // Extract element type from payload.source.table
1345 let element_type = payload["source"]["table"]
1346 .as_str()
1347 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.table' field"))?;
1348
1349 // Get element data based on operation
1350 let element_data = match op {
1351 "i" | "u" => &payload["after"],
1352 "d" => &payload["before"],
1353 _ => return Err(anyhow::anyhow!("Unknown operation type: {op}")),
1354 };
1355
1356 if element_data.is_null() {
1357 return Err(anyhow::anyhow!(
1358 "Missing element data (after/before) for operation {op}"
1359 ));
1360 }
1361
1362 // Extract element ID
1363 let element_id = element_data["id"]
1364 .as_str()
1365 .ok_or_else(|| anyhow::anyhow!("Missing or invalid element 'id' field"))?;
1366
1367 // Extract labels
1368 let labels_array = element_data["labels"]
1369 .as_array()
1370 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'labels' field"))?;
1371
1372 let labels: Vec<Arc<str>> = labels_array
1373 .iter()
1374 .filter_map(|v| v.as_str().map(Arc::from))
1375 .collect();
1376
1377 if labels.is_empty() {
1378 return Err(anyhow::anyhow!("Labels array is empty or invalid"));
1379 }
1380
1381 // Extract timestamp from payload.source.ts_ns (in nanoseconds) and convert to milliseconds
1382 let ts_ns = payload["source"]["ts_ns"]
1383 .as_u64()
1384 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'payload.source.ts_ns' field"))?;
1385 let effective_from = ts_ns / 1_000_000; // Convert nanoseconds to milliseconds
1386
1387 // Build ElementMetadata
1388 let reference = ElementReference::new(source_id, element_id);
1389 let metadata = ElementMetadata {
1390 reference,
1391 labels: labels.into(),
1392 effective_from,
1393 };
1394
1395 // Handle delete operation (no properties needed)
1396 if op == "d" {
1397 source_changes.push(SourceChangeWithTimestamps {
1398 source_change: SourceChange::Delete { metadata },
1399 reactivator_start_ns,
1400 reactivator_end_ns,
1401 });
1402 continue;
1403 }
1404
1405 // Convert properties
1406 let properties_obj = element_data["properties"]
1407 .as_object()
1408 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'properties' field"))?;
1409
1410 let properties = convert_json_to_element_properties(properties_obj)?;
1411
1412 // Build element based on type
1413 let element = match element_type {
1414 "node" => Element::Node {
1415 metadata,
1416 properties,
1417 },
1418 "rel" | "relation" => {
1419 // Extract startId and endId
1420 let start_id = element_data["startId"]
1421 .as_str()
1422 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'startId' for relation"))?;
1423 let end_id = element_data["endId"]
1424 .as_str()
1425 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'endId' for relation"))?;
1426
1427 Element::Relation {
1428 metadata,
1429 properties,
1430 in_node: ElementReference::new(source_id, start_id),
1431 out_node: ElementReference::new(source_id, end_id),
1432 }
1433 }
1434 _ => return Err(anyhow::anyhow!("Unknown element type: {element_type}")),
1435 };
1436
1437 // Build SourceChange
1438 let source_change = match op {
1439 "i" => SourceChange::Insert { element },
1440 "u" => SourceChange::Update { element },
1441 _ => unreachable!(),
1442 };
1443
1444 source_changes.push(SourceChangeWithTimestamps {
1445 source_change,
1446 reactivator_start_ns,
1447 reactivator_end_ns,
1448 });
1449 }
1450
1451 Ok(source_changes)
1452}
1453
1454/// Transform CloudEvent-wrapped control event to SourceControl(s)
1455///
1456/// Handles control messages from Query API service with source.db = "Drasi"
1457/// Currently supports "SourceSubscription" control type
1458fn transform_control_event(cloud_event: Value, control_type: &str) -> Result<Vec<SourceControl>> {
1459 let mut control_events = Vec::new();
1460
1461 // Extract the data array from CloudEvent wrapper
1462 let data_array = cloud_event["data"]
1463 .as_array()
1464 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'data' array in CloudEvent"))?;
1465
1466 // Check if control type is supported
1467 if control_type != "SourceSubscription" {
1468 info!(
1469 "Skipping unknown control type '{control_type}' (only 'SourceSubscription' is supported)"
1470 );
1471 return Ok(control_events); // Return empty vec for unknown types
1472 }
1473
1474 // Process each event in the data array
1475 for event in data_array {
1476 // Extract operation type (op field: "i", "u", "d")
1477 let op = event["op"]
1478 .as_str()
1479 .ok_or_else(|| anyhow::anyhow!("Missing or invalid 'op' field in control event"))?;
1480
1481 // Extract payload
1482 let payload = &event["payload"];
1483 if payload.is_null() {
1484 warn!("Missing 'payload' field in control event, skipping");
1485 continue;
1486 }
1487
1488 // Get data based on operation
1489 let control_data = match op {
1490 "i" | "u" => &payload["after"],
1491 "d" => &payload["before"],
1492 _ => {
1493 warn!("Unknown operation type in control event: {op}, skipping");
1494 continue;
1495 }
1496 };
1497
1498 if control_data.is_null() {
1499 warn!("Missing control data (after/before) for operation {op}, skipping");
1500 continue;
1501 }
1502
1503 // Extract required fields for SourceSubscription
1504 let query_id = match control_data["queryId"].as_str() {
1505 Some(id) => id.to_string(),
1506 None => {
1507 warn!("Missing required 'queryId' field in control event, skipping");
1508 continue;
1509 }
1510 };
1511
1512 let query_node_id = match control_data["queryNodeId"].as_str() {
1513 Some(id) => id.to_string(),
1514 None => {
1515 warn!("Missing required 'queryNodeId' field in control event, skipping");
1516 continue;
1517 }
1518 };
1519
1520 // Extract optional label arrays (default to empty if missing)
1521 let node_labels = control_data["nodeLabels"]
1522 .as_array()
1523 .map(|arr| {
1524 arr.iter()
1525 .filter_map(|v| v.as_str().map(String::from))
1526 .collect()
1527 })
1528 .unwrap_or_default();
1529
1530 let rel_labels = control_data["relLabels"]
1531 .as_array()
1532 .map(|arr| {
1533 arr.iter()
1534 .filter_map(|v| v.as_str().map(String::from))
1535 .collect()
1536 })
1537 .unwrap_or_default();
1538
1539 // Map operation to ControlOperation
1540 let operation = match op {
1541 "i" => ControlOperation::Insert,
1542 "u" => ControlOperation::Update,
1543 "d" => ControlOperation::Delete,
1544 _ => unreachable!(), // Already filtered above
1545 };
1546
1547 // Build SourceControl::Subscription
1548 let control_event = SourceControl::Subscription {
1549 query_id,
1550 query_node_id,
1551 node_labels,
1552 rel_labels,
1553 operation,
1554 };
1555
1556 control_events.push(control_event);
1557 }
1558
1559 Ok(control_events)
1560}
1561
1562/// Dynamic plugin entry point.
1563///
1564/// Dynamic plugin entry point.
1565#[cfg(feature = "dynamic-plugin")]
1566drasi_plugin_sdk::export_plugin!(
1567 plugin_id = "platform-source",
1568 core_version = env!("CARGO_PKG_VERSION"),
1569 lib_version = env!("CARGO_PKG_VERSION"),
1570 plugin_version = env!("CARGO_PKG_VERSION"),
1571 source_descriptors = [descriptor::PlatformSourceDescriptor],
1572 reaction_descriptors = [],
1573 bootstrap_descriptors = [],
1574);