Skip to main content

drasi_source_mock/
mock.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::config::{DataType, MockSourceConfig};
16use anyhow::Result;
17use async_trait::async_trait;
18use drasi_core::models::{
19    Element, ElementMetadata, ElementPropertyMap, ElementReference, SourceChange,
20};
21use log::{debug, info};
22use serde_json::Value;
23use std::collections::{HashMap, HashSet};
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27use drasi_lib::channels::*;
28use drasi_lib::managers::{log_component_start, log_component_stop};
29use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
30use drasi_lib::Source;
31use tracing::Instrument;
32
33/// Mock source that generates synthetic data for testing and development.
34///
35/// This source runs an internal tokio task that generates data at configurable
36/// intervals. It supports different data types (Counter, SensorReading, Generic)
37/// to simulate various real-world scenarios.
38///
39/// # Event Generation Behavior
40///
41/// - **Counter**: Always emits INSERT events with sequential IDs
42/// - **SensorReading**: Emits INSERT for first reading per sensor, UPDATE thereafter
43/// - **Generic**: Always emits INSERT events with sequential IDs
44///
45/// # Thread Safety
46///
47/// This type is `Send + Sync` and can be safely shared across threads.
48/// Internal state is protected by `RwLock`.
49pub struct MockSource {
50    /// Base source implementation providing dispatchers, status tracking, and lifecycle management.
51    base: SourceBase,
52
53    /// Configuration specifying data type and generation interval.
54    config: MockSourceConfig,
55
56    /// Tracks sensor IDs that have been seen for INSERT vs UPDATE logic.
57    /// Only used when `config.data_type` is `SensorReading`.
58    seen_sensors: Arc<RwLock<HashSet<u32>>>,
59}
60
61impl MockSource {
62    /// Creates a new `MockSource` with the given ID and configuration.
63    ///
64    /// The source is created in a stopped state. Call [`start()`](Self::start) to begin
65    /// generating events, or add it to DrasiLib which will start it automatically
66    /// (unless `auto_start` is disabled via the builder).
67    ///
68    /// # Arguments
69    ///
70    /// * `id` - Unique identifier for this source instance. Must be unique within a DrasiLib instance.
71    /// * `config` - Configuration specifying data type and generation interval.
72    ///
73    /// # Returns
74    ///
75    /// A new `MockSource` instance, or an error if validation fails.
76    ///
77    /// # Errors
78    ///
79    /// Returns [`anyhow::Error`] if:
80    /// - `config.interval_ms` is 0 (would cause spin loop)
81    /// - `config.data_type` is `SensorReading` with `sensor_count` of 0
82    ///
83    /// # Example
84    ///
85    /// ```rust,ignore
86    /// use drasi_source_mock::{MockSource, MockSourceConfig, DataType};
87    ///
88    /// let config = MockSourceConfig {
89    ///     data_type: DataType::sensor_reading(10),
90    ///     interval_ms: 1000,
91    /// };
92    ///
93    /// let source = MockSource::new("my-mock-source", config)?;
94    /// ```
95    pub fn new(id: impl Into<String>, config: MockSourceConfig) -> Result<Self> {
96        config.validate()?;
97        let id = id.into();
98        let params = SourceBaseParams::new(id);
99        Ok(Self {
100            base: SourceBase::new(params)?,
101            config,
102            seen_sensors: Arc::new(RwLock::new(HashSet::new())),
103        })
104    }
105
106    /// Creates a new `MockSource` with custom dispatch settings.
107    ///
108    /// This is a lower-level constructor for advanced use cases where you need
109    /// control over event dispatching. For most cases, prefer [`MockSource::builder()`].
110    ///
111    /// # Arguments
112    ///
113    /// * `id` - Unique identifier for this source instance.
114    /// * `config` - Configuration specifying data type and generation interval.
115    /// * `dispatch_mode` - Optional dispatch mode (`Channel` or `Broadcast`).
116    /// * `dispatch_buffer_capacity` - Optional buffer size for dispatch channels.
117    ///
118    /// # Errors
119    ///
120    /// Returns [`anyhow::Error`] if:
121    /// - `config.interval_ms` is 0
122    /// - `config.data_type` is `SensorReading` with `sensor_count` of 0
123    pub fn with_dispatch(
124        id: impl Into<String>,
125        config: MockSourceConfig,
126        dispatch_mode: Option<DispatchMode>,
127        dispatch_buffer_capacity: Option<usize>,
128    ) -> Result<Self> {
129        config.validate()?;
130        let id = id.into();
131        let mut params = SourceBaseParams::new(id);
132        if let Some(mode) = dispatch_mode {
133            params = params.with_dispatch_mode(mode);
134        }
135        if let Some(capacity) = dispatch_buffer_capacity {
136            params = params.with_dispatch_buffer_capacity(capacity);
137        }
138        Ok(Self {
139            base: SourceBase::new(params)?,
140            config,
141            seen_sensors: Arc::new(RwLock::new(HashSet::new())),
142        })
143    }
144}
145
146#[async_trait]
147impl Source for MockSource {
148    fn id(&self) -> &str {
149        &self.base.id
150    }
151
152    fn type_name(&self) -> &str {
153        "mock"
154    }
155
156    fn properties(&self) -> HashMap<String, serde_json::Value> {
157        // Serialize through the DTO to get camelCase naming and structured output
158        // matching the creation schema and config file format
159        use crate::descriptor::{DataTypeDto, MockSourceConfigDto};
160        use drasi_plugin_sdk::ConfigValue;
161
162        let data_type_dto = match &self.config.data_type {
163            DataType::Counter => DataTypeDto::Counter,
164            DataType::SensorReading { sensor_count } => DataTypeDto::SensorReading {
165                sensor_count: *sensor_count,
166            },
167            DataType::Generic => DataTypeDto::Generic,
168        };
169
170        let dto = MockSourceConfigDto {
171            data_type: data_type_dto,
172            interval_ms: ConfigValue::Static(self.config.interval_ms),
173        };
174
175        match serde_json::to_value(&dto) {
176            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
177            _ => HashMap::new(),
178        }
179    }
180
181    fn auto_start(&self) -> bool {
182        self.base.get_auto_start()
183    }
184
185    async fn start(&self) -> Result<()> {
186        log_component_start("Mock Source", &self.base.id);
187
188        self.base
189            .set_status(
190                ComponentStatus::Starting,
191                Some("Starting mock source".to_string()),
192            )
193            .await;
194
195        // Get broadcast_tx for publishing
196        let base_dispatchers = self.base.dispatchers.clone();
197        let source_id = self.base.id.clone();
198
199        // Get configuration
200        let data_type = self.config.data_type.clone();
201        let interval_ms = self.config.interval_ms;
202
203        // Clone seen_sensors for the task
204        let seen_sensors = Arc::clone(&self.seen_sensors);
205
206        // Clone seen_sensors for the task
207        let seen_sensors = Arc::clone(&self.seen_sensors);
208
209        // Get instance_id from context for log routing isolation
210        let instance_id = self
211            .base
212            .context()
213            .await
214            .map(|c| c.instance_id)
215            .unwrap_or_default();
216
217        // Start the data generation task with component span for proper log routing
218        let status_handle = self.base.status_handle();
219        let source_name = self.base.id.clone();
220        let source_id_for_span = source_id.clone();
221        let span = tracing::info_span!(
222            "mock_source_task",
223            instance_id = %instance_id,
224            component_id = %source_id_for_span,
225            component_type = "source"
226        );
227        let task = tokio::spawn(
228            async move {
229                // Set Running status inside the task to avoid a race condition where
230                // the loop checks status before the caller sets it after spawn.
231                status_handle
232                    .set_status(
233                        ComponentStatus::Running,
234                        Some("Mock source started successfully".to_string()),
235                    )
236                    .await;
237
238                let mut interval =
239                    tokio::time::interval(tokio::time::Duration::from_millis(interval_ms));
240                let mut seq = 0u64;
241
242                loop {
243                    interval.tick().await;
244
245                    // Check if we should stop
246                    if !matches!(status_handle.get_status().await, ComponentStatus::Running) {
247                        break;
248                    }
249
250                    seq += 1;
251
252                    // Generate data based on type
253                    let source_change = match data_type {
254                        DataType::Counter => {
255                            let element_id = format!("counter_{seq}");
256                            let reference = ElementReference::new(&source_name, &element_id);
257
258                            let mut property_map = ElementPropertyMap::new();
259                            property_map.insert(
260                                "value",
261                                crate::conversion::json_to_element_value_or_default(
262                                    &Value::Number(seq.into()),
263                                    drasi_core::models::ElementValue::Null,
264                                ),
265                            );
266                            property_map.insert(
267                                "timestamp",
268                                crate::conversion::json_to_element_value_or_default(
269                                    &Value::String(chrono::Utc::now().to_rfc3339()),
270                                    drasi_core::models::ElementValue::Null,
271                                ),
272                            );
273
274                            let metadata = ElementMetadata {
275                                reference,
276                                labels: Arc::from(vec![Arc::from("Counter")]),
277                                effective_from: crate::time::get_system_time_millis()
278                                    .unwrap_or_else(|e| {
279                                        log::warn!("Failed to get timestamp for mock counter: {e}");
280                                        chrono::Utc::now().timestamp_millis() as u64
281                                    }),
282                            };
283
284                            let element = Element::Node {
285                                metadata,
286                                properties: property_map,
287                            };
288
289                            SourceChange::Insert { element }
290                        }
291                        DataType::SensorReading { sensor_count } => {
292                            // Constrain sensor_id to the configured number of sensors
293                            let sensor_id = rand::random::<u32>() % sensor_count;
294                            // Use sensor_id as the element_id for stable identity
295                            let element_id = format!("sensor_{sensor_id}");
296                            let reference = ElementReference::new(&source_name, &element_id);
297
298                            let mut property_map = ElementPropertyMap::new();
299                            property_map.insert(
300                                "sensor_id",
301                                crate::conversion::json_to_element_value_or_default(
302                                    &Value::String(format!("sensor_{sensor_id}")),
303                                    drasi_core::models::ElementValue::Null,
304                                ),
305                            );
306                            property_map.insert(
307                                "temperature",
308                                crate::conversion::json_to_element_value_or_default(
309                                    &Value::Number(
310                                        serde_json::Number::from_f64(
311                                            20.0 + rand::random::<f64>() * 10.0,
312                                        )
313                                        .unwrap_or(serde_json::Number::from(25)),
314                                    ),
315                                    drasi_core::models::ElementValue::Null,
316                                ),
317                            );
318                            property_map.insert(
319                                "humidity",
320                                crate::conversion::json_to_element_value_or_default(
321                                    &Value::Number(
322                                        serde_json::Number::from_f64(
323                                            40.0 + rand::random::<f64>() * 20.0,
324                                        )
325                                        .unwrap_or(serde_json::Number::from(50)),
326                                    ),
327                                    drasi_core::models::ElementValue::Null,
328                                ),
329                            );
330                            property_map.insert(
331                                "timestamp",
332                                crate::conversion::json_to_element_value_or_default(
333                                    &Value::String(chrono::Utc::now().to_rfc3339()),
334                                    drasi_core::models::ElementValue::Null,
335                                ),
336                            );
337
338                            let metadata = ElementMetadata {
339                                reference,
340                                labels: Arc::from(vec![Arc::from("SensorReading")]),
341                                effective_from: crate::time::get_system_time_millis()
342                                    .unwrap_or_else(|e| {
343                                        log::warn!("Failed to get timestamp for mock sensor: {e}");
344                                        chrono::Utc::now().timestamp_millis() as u64
345                                    }),
346                            };
347
348                            let element = Element::Node {
349                                metadata,
350                                properties: property_map,
351                            };
352
353                            // Determine if this is a new sensor (Insert) or an update (Update)
354                            let is_new = {
355                                let mut seen = seen_sensors.write().await;
356                                seen.insert(sensor_id)
357                            };
358
359                            if is_new {
360                                SourceChange::Insert { element }
361                            } else {
362                                SourceChange::Update { element }
363                            }
364                        }
365                        DataType::Generic => {
366                            // Generic data
367                            let element_id = format!("generic_{seq}");
368                            let reference = ElementReference::new(&source_name, &element_id);
369
370                            let mut property_map = ElementPropertyMap::new();
371                            property_map.insert(
372                                "value",
373                                crate::conversion::json_to_element_value_or_default(
374                                    &Value::Number(rand::random::<i32>().into()),
375                                    drasi_core::models::ElementValue::Null,
376                                ),
377                            );
378                            property_map.insert(
379                                "message",
380                                crate::conversion::json_to_element_value_or_default(
381                                    &Value::String("Generic mock data".to_string()),
382                                    drasi_core::models::ElementValue::Null,
383                                ),
384                            );
385                            property_map.insert(
386                                "timestamp",
387                                crate::conversion::json_to_element_value_or_default(
388                                    &Value::String(chrono::Utc::now().to_rfc3339()),
389                                    drasi_core::models::ElementValue::Null,
390                                ),
391                            );
392
393                            let metadata = ElementMetadata {
394                                reference,
395                                labels: Arc::from(vec![Arc::from("Generic")]),
396                                effective_from: crate::time::get_system_time_millis()
397                                    .unwrap_or_else(|e| {
398                                        log::warn!("Failed to get timestamp for mock generic: {e}");
399                                        chrono::Utc::now().timestamp_millis() as u64
400                                    }),
401                            };
402
403                            let element = Element::Node {
404                                metadata,
405                                properties: property_map,
406                            };
407
408                            SourceChange::Insert { element }
409                        }
410                    };
411
412                    // Create profiling metadata with timestamps
413                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
414                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
415
416                    let wrapper = SourceEventWrapper::with_profiling(
417                        source_id.clone(),
418                        SourceEvent::Change(source_change),
419                        chrono::Utc::now(),
420                        profiling,
421                    );
422
423                    // Dispatch to all subscribers via helper
424                    if let Err(e) = SourceBase::dispatch_from_task(
425                        base_dispatchers.clone(),
426                        wrapper,
427                        &source_id,
428                    )
429                    .await
430                    {
431                        debug!("Failed to dispatch change: {e}");
432                    }
433                }
434
435                info!("Mock source task completed");
436            }
437            .instrument(span),
438        );
439
440        *self.base.task_handle.write().await = Some(task);
441
442        Ok(())
443    }
444
445    async fn stop(&self) -> Result<()> {
446        log_component_stop("Mock Source", &self.base.id);
447
448        self.base
449            .set_status(
450                ComponentStatus::Stopping,
451                Some("Stopping mock source".to_string()),
452            )
453            .await;
454
455        // Cancel the task
456        if let Some(handle) = self.base.task_handle.write().await.take() {
457            handle.abort();
458            let _ = handle.await;
459        }
460
461        self.base
462            .set_status(
463                ComponentStatus::Stopped,
464                Some("Mock source stopped successfully".to_string()),
465            )
466            .await;
467
468        Ok(())
469    }
470
471    async fn status(&self) -> ComponentStatus {
472        self.base.get_status().await
473    }
474
475    async fn subscribe(
476        &self,
477        settings: drasi_lib::config::SourceSubscriptionSettings,
478    ) -> Result<SubscriptionResponse> {
479        self.base.subscribe_with_bootstrap(&settings, "Mock").await
480    }
481
482    fn as_any(&self) -> &dyn std::any::Any {
483        self
484    }
485
486    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
487        self.base.initialize(context).await;
488    }
489
490    async fn set_bootstrap_provider(
491        &self,
492        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
493    ) {
494        self.base.set_bootstrap_provider(provider).await;
495    }
496}
497
498impl MockSource {
499    /// Injects a custom event into the source for testing purposes.
500    ///
501    /// This allows tests to send specific [`SourceChange`] events (INSERT, UPDATE, DELETE)
502    /// without waiting for automatic generation. Useful for deterministic testing scenarios.
503    ///
504    /// The source does not need to be started to inject events.
505    ///
506    /// # Arguments
507    ///
508    /// * `change` - The [`SourceChange`] to inject (e.g., `SourceChange::Insert { element }`)
509    ///
510    /// # Errors
511    ///
512    /// Returns [`anyhow::Error`] if dispatching fails (e.g., all receivers have been dropped).
513    pub async fn inject_event(&self, change: SourceChange) -> Result<()> {
514        self.base.dispatch_source_change(change).await
515    }
516
517    /// Creates a test subscription to receive events from this source.
518    ///
519    /// This bypasses DrasiLib's subscription mechanism and directly subscribes to
520    /// the source's event dispatcher. Useful for unit testing the source in isolation.
521    ///
522    /// # Returns
523    ///
524    /// A boxed receiver that yields [`SourceEventWrapper`](drasi_lib::channels::SourceEventWrapper)
525    /// for each event generated or injected.
526    ///
527    /// # Example
528    ///
529    /// ```rust,ignore
530    /// let source = MockSource::new("test", config)?;
531    /// let mut rx = source.test_subscribe();
532    ///
533    /// source.start().await?;
534    ///
535    /// // Receive events
536    /// while let Some(event) = rx.recv().await {
537    ///     println!("Received: {:?}", event);
538    /// }
539    /// ```
540    pub fn test_subscribe(
541        &self,
542    ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
543        self.base.test_subscribe()
544    }
545}
546
547/// Builder for [`MockSource`] instances.
548///
549/// Provides a fluent API for constructing mock sources with sensible defaults.
550/// This is the recommended way to create a `MockSource`.
551///
552/// # Defaults
553///
554/// | Option | Default |
555/// |--------|---------|
556/// | `data_type` | [`DataType::Generic`] |
557/// | `interval_ms` | 5000 |
558/// | `dispatch_mode` | `Channel` |
559/// | `dispatch_buffer_capacity` | 1000 |
560/// | `auto_start` | `true` |
561///
562/// # Example
563///
564/// ```rust,ignore
565/// use drasi_source_mock::{MockSource, DataType};
566///
567/// let source = MockSource::builder("my-source")
568///     .with_data_type(DataType::sensor_reading(10))
569///     .with_interval_ms(1000)
570///     .with_auto_start(false)  // Don't start automatically
571///     .build()?;
572/// ```
573pub struct MockSourceBuilder {
574    id: String,
575    data_type: DataType,
576    interval_ms: u64,
577    dispatch_mode: Option<DispatchMode>,
578    dispatch_buffer_capacity: Option<usize>,
579    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
580    auto_start: bool,
581}
582
583impl MockSourceBuilder {
584    /// Create a new builder with the given source ID.
585    ///
586    /// # Arguments
587    ///
588    /// * `id` - Unique identifier for the source instance
589    pub fn new(id: impl Into<String>) -> Self {
590        Self {
591            id: id.into(),
592            data_type: DataType::Generic,
593            interval_ms: 5000,
594            dispatch_mode: None,
595            dispatch_buffer_capacity: None,
596            bootstrap_provider: None,
597            auto_start: true,
598        }
599    }
600
601    /// Set the data type to generate.
602    ///
603    /// # Arguments
604    ///
605    /// * `data_type` - One of: `DataType::Counter`, `DataType::SensorReading { sensor_count }`, or `DataType::Generic` (default)
606    ///
607    /// For SensorReading, use `DataType::sensor_reading(count)` helper method.
608    pub fn with_data_type(mut self, data_type: DataType) -> Self {
609        self.data_type = data_type;
610        self
611    }
612
613    /// Set the generation interval in milliseconds.
614    ///
615    /// # Arguments
616    ///
617    /// * `interval_ms` - Interval between data generation (default: 5000)
618    pub fn with_interval_ms(mut self, interval_ms: u64) -> Self {
619        self.interval_ms = interval_ms;
620        self
621    }
622
623    /// Set the dispatch mode for event routing.
624    ///
625    /// # Arguments
626    ///
627    /// * `mode` - `Channel` (default, with backpressure) or `Broadcast`
628    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
629        self.dispatch_mode = Some(mode);
630        self
631    }
632
633    /// Set the dispatch buffer capacity.
634    ///
635    /// # Arguments
636    ///
637    /// * `capacity` - Buffer size for dispatch channels (default: 1000)
638    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
639        self.dispatch_buffer_capacity = Some(capacity);
640        self
641    }
642
643    /// Set the bootstrap provider for initial data delivery.
644    ///
645    /// # Arguments
646    ///
647    /// * `provider` - Bootstrap provider implementation
648    pub fn with_bootstrap_provider(
649        mut self,
650        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
651    ) -> Self {
652        self.bootstrap_provider = Some(Box::new(provider));
653        self
654    }
655
656    /// Set whether this source should auto-start when DrasiLib starts.
657    ///
658    /// Default is `true`. Set to `false` if this source should only be
659    /// started manually via `start_source()`.
660    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
661        self.auto_start = auto_start;
662        self
663    }
664
665    /// Build the MockSource instance.
666    ///
667    /// # Returns
668    ///
669    /// A fully constructed `MockSource`, or an error if construction fails.
670    ///
671    /// # Errors
672    ///
673    /// Returns an error if:
674    /// - The base source cannot be initialized
675    /// - The configuration is invalid (e.g., interval_ms is 0, sensor_count is 0)
676    pub fn build(self) -> Result<MockSource> {
677        let config = MockSourceConfig {
678            data_type: self.data_type,
679            interval_ms: self.interval_ms,
680        };
681
682        config.validate()?;
683
684        // Build SourceBaseParams with all settings
685        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
686        if let Some(mode) = self.dispatch_mode {
687            params = params.with_dispatch_mode(mode);
688        }
689        if let Some(capacity) = self.dispatch_buffer_capacity {
690            params = params.with_dispatch_buffer_capacity(capacity);
691        }
692        if let Some(provider) = self.bootstrap_provider {
693            params = params.with_bootstrap_provider(provider);
694        }
695
696        Ok(MockSource {
697            base: SourceBase::new(params)?,
698            config,
699            seen_sensors: Arc::new(RwLock::new(HashSet::new())),
700        })
701    }
702}
703
704impl MockSource {
705    /// Create a builder for MockSource with the given ID.
706    ///
707    /// This is the recommended way to construct a MockSource.
708    ///
709    /// # Arguments
710    ///
711    /// * `id` - Unique identifier for the source instance
712    ///
713    /// # Example
714    ///
715    /// ```rust,ignore
716    /// let source = MockSource::builder("my-source")
717    ///     .with_data_type(DataType::sensor_reading(10))
718    ///     .with_interval_ms(1000)
719    ///     .build()?;
720    /// ```
721    pub fn builder(id: impl Into<String>) -> MockSourceBuilder {
722        MockSourceBuilder::new(id)
723    }
724}