drasi_source_mock/mock.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::config::{DataType, MockSourceConfig};
16use anyhow::Result;
17use async_trait::async_trait;
18use drasi_core::models::{
19 Element, ElementMetadata, ElementPropertyMap, ElementReference, SourceChange,
20};
21use log::{debug, info};
22use serde_json::Value;
23use std::collections::{HashMap, HashSet};
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27use drasi_lib::channels::*;
28use drasi_lib::managers::{log_component_start, log_component_stop};
29use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
30use drasi_lib::Source;
31use tracing::Instrument;
32
33/// Mock source that generates synthetic data for testing and development.
34///
35/// This source runs an internal tokio task that generates data at configurable
36/// intervals. It supports different data types (Counter, SensorReading, Generic)
37/// to simulate various real-world scenarios.
38///
39/// # Event Generation Behavior
40///
41/// - **Counter**: Always emits INSERT events with sequential IDs
42/// - **SensorReading**: Emits INSERT for first reading per sensor, UPDATE thereafter
43/// - **Generic**: Always emits INSERT events with sequential IDs
44///
45/// # Thread Safety
46///
47/// This type is `Send + Sync` and can be safely shared across threads.
48/// Internal state is protected by `RwLock`.
49pub struct MockSource {
50 /// Base source implementation providing dispatchers, status tracking, and lifecycle management.
51 base: SourceBase,
52
53 /// Configuration specifying data type and generation interval.
54 config: MockSourceConfig,
55
56 /// Tracks sensor IDs that have been seen for INSERT vs UPDATE logic.
57 /// Only used when `config.data_type` is `SensorReading`.
58 seen_sensors: Arc<RwLock<HashSet<u32>>>,
59}
60
61impl MockSource {
62 /// Creates a new `MockSource` with the given ID and configuration.
63 ///
64 /// The source is created in a stopped state. Call [`start()`](Self::start) to begin
65 /// generating events, or add it to DrasiLib which will start it automatically
66 /// (unless `auto_start` is disabled via the builder).
67 ///
68 /// # Arguments
69 ///
70 /// * `id` - Unique identifier for this source instance. Must be unique within a DrasiLib instance.
71 /// * `config` - Configuration specifying data type and generation interval.
72 ///
73 /// # Returns
74 ///
75 /// A new `MockSource` instance, or an error if validation fails.
76 ///
77 /// # Errors
78 ///
79 /// Returns [`anyhow::Error`] if:
80 /// - `config.interval_ms` is 0 (would cause spin loop)
81 /// - `config.data_type` is `SensorReading` with `sensor_count` of 0
82 ///
83 /// # Example
84 ///
85 /// ```rust,ignore
86 /// use drasi_source_mock::{MockSource, MockSourceConfig, DataType};
87 ///
88 /// let config = MockSourceConfig {
89 /// data_type: DataType::sensor_reading(10),
90 /// interval_ms: 1000,
91 /// };
92 ///
93 /// let source = MockSource::new("my-mock-source", config)?;
94 /// ```
95 pub fn new(id: impl Into<String>, config: MockSourceConfig) -> Result<Self> {
96 config.validate()?;
97 let id = id.into();
98 let params = SourceBaseParams::new(id);
99 Ok(Self {
100 base: SourceBase::new(params)?,
101 config,
102 seen_sensors: Arc::new(RwLock::new(HashSet::new())),
103 })
104 }
105
106 /// Creates a new `MockSource` with custom dispatch settings.
107 ///
108 /// This is a lower-level constructor for advanced use cases where you need
109 /// control over event dispatching. For most cases, prefer [`MockSource::builder()`].
110 ///
111 /// # Arguments
112 ///
113 /// * `id` - Unique identifier for this source instance.
114 /// * `config` - Configuration specifying data type and generation interval.
115 /// * `dispatch_mode` - Optional dispatch mode (`Channel` or `Broadcast`).
116 /// * `dispatch_buffer_capacity` - Optional buffer size for dispatch channels.
117 ///
118 /// # Errors
119 ///
120 /// Returns [`anyhow::Error`] if:
121 /// - `config.interval_ms` is 0
122 /// - `config.data_type` is `SensorReading` with `sensor_count` of 0
123 pub fn with_dispatch(
124 id: impl Into<String>,
125 config: MockSourceConfig,
126 dispatch_mode: Option<DispatchMode>,
127 dispatch_buffer_capacity: Option<usize>,
128 ) -> Result<Self> {
129 config.validate()?;
130 let id = id.into();
131 let mut params = SourceBaseParams::new(id);
132 if let Some(mode) = dispatch_mode {
133 params = params.with_dispatch_mode(mode);
134 }
135 if let Some(capacity) = dispatch_buffer_capacity {
136 params = params.with_dispatch_buffer_capacity(capacity);
137 }
138 Ok(Self {
139 base: SourceBase::new(params)?,
140 config,
141 seen_sensors: Arc::new(RwLock::new(HashSet::new())),
142 })
143 }
144}
145
146#[async_trait]
147impl Source for MockSource {
148 fn id(&self) -> &str {
149 &self.base.id
150 }
151
152 fn type_name(&self) -> &str {
153 "mock"
154 }
155
156 fn properties(&self) -> HashMap<String, serde_json::Value> {
157 // Serialize through the DTO to get camelCase naming and structured output
158 // matching the creation schema and config file format
159 use crate::descriptor::{DataTypeDto, MockSourceConfigDto};
160 use drasi_plugin_sdk::ConfigValue;
161
162 let data_type_dto = match &self.config.data_type {
163 DataType::Counter => DataTypeDto::Counter,
164 DataType::SensorReading { sensor_count } => DataTypeDto::SensorReading {
165 sensor_count: *sensor_count,
166 },
167 DataType::Generic => DataTypeDto::Generic,
168 };
169
170 let dto = MockSourceConfigDto {
171 data_type: data_type_dto,
172 interval_ms: ConfigValue::Static(self.config.interval_ms),
173 };
174
175 match serde_json::to_value(&dto) {
176 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
177 _ => HashMap::new(),
178 }
179 }
180
181 fn auto_start(&self) -> bool {
182 self.base.get_auto_start()
183 }
184
185 async fn start(&self) -> Result<()> {
186 log_component_start("Mock Source", &self.base.id);
187
188 self.base
189 .set_status(
190 ComponentStatus::Starting,
191 Some("Starting mock source".to_string()),
192 )
193 .await;
194
195 // Get broadcast_tx for publishing
196 let base_dispatchers = self.base.dispatchers.clone();
197 let source_id = self.base.id.clone();
198
199 // Get configuration
200 let data_type = self.config.data_type.clone();
201 let interval_ms = self.config.interval_ms;
202
203 // Clone seen_sensors for the task
204 let seen_sensors = Arc::clone(&self.seen_sensors);
205
206 // Clone seen_sensors for the task
207 let seen_sensors = Arc::clone(&self.seen_sensors);
208
209 // Get instance_id from context for log routing isolation
210 let instance_id = self
211 .base
212 .context()
213 .await
214 .map(|c| c.instance_id)
215 .unwrap_or_default();
216
217 // Start the data generation task with component span for proper log routing
218 let status_handle = self.base.status_handle();
219 let source_name = self.base.id.clone();
220 let source_id_for_span = source_id.clone();
221 let span = tracing::info_span!(
222 "mock_source_task",
223 instance_id = %instance_id,
224 component_id = %source_id_for_span,
225 component_type = "source"
226 );
227 let task = tokio::spawn(
228 async move {
229 // Set Running status inside the task to avoid a race condition where
230 // the loop checks status before the caller sets it after spawn.
231 status_handle
232 .set_status(
233 ComponentStatus::Running,
234 Some("Mock source started successfully".to_string()),
235 )
236 .await;
237
238 let mut interval =
239 tokio::time::interval(tokio::time::Duration::from_millis(interval_ms));
240 let mut seq = 0u64;
241
242 loop {
243 interval.tick().await;
244
245 // Check if we should stop
246 if !matches!(status_handle.get_status().await, ComponentStatus::Running) {
247 break;
248 }
249
250 seq += 1;
251
252 // Generate data based on type
253 let source_change = match data_type {
254 DataType::Counter => {
255 let element_id = format!("counter_{seq}");
256 let reference = ElementReference::new(&source_name, &element_id);
257
258 let mut property_map = ElementPropertyMap::new();
259 property_map.insert(
260 "value",
261 crate::conversion::json_to_element_value_or_default(
262 &Value::Number(seq.into()),
263 drasi_core::models::ElementValue::Null,
264 ),
265 );
266 property_map.insert(
267 "timestamp",
268 crate::conversion::json_to_element_value_or_default(
269 &Value::String(chrono::Utc::now().to_rfc3339()),
270 drasi_core::models::ElementValue::Null,
271 ),
272 );
273
274 let metadata = ElementMetadata {
275 reference,
276 labels: Arc::from(vec![Arc::from("Counter")]),
277 effective_from: crate::time::get_system_time_millis()
278 .unwrap_or_else(|e| {
279 log::warn!("Failed to get timestamp for mock counter: {e}");
280 chrono::Utc::now().timestamp_millis() as u64
281 }),
282 };
283
284 let element = Element::Node {
285 metadata,
286 properties: property_map,
287 };
288
289 SourceChange::Insert { element }
290 }
291 DataType::SensorReading { sensor_count } => {
292 // Constrain sensor_id to the configured number of sensors
293 let sensor_id = rand::random::<u32>() % sensor_count;
294 // Use sensor_id as the element_id for stable identity
295 let element_id = format!("sensor_{sensor_id}");
296 let reference = ElementReference::new(&source_name, &element_id);
297
298 let mut property_map = ElementPropertyMap::new();
299 property_map.insert(
300 "sensor_id",
301 crate::conversion::json_to_element_value_or_default(
302 &Value::String(format!("sensor_{sensor_id}")),
303 drasi_core::models::ElementValue::Null,
304 ),
305 );
306 property_map.insert(
307 "temperature",
308 crate::conversion::json_to_element_value_or_default(
309 &Value::Number(
310 serde_json::Number::from_f64(
311 20.0 + rand::random::<f64>() * 10.0,
312 )
313 .unwrap_or(serde_json::Number::from(25)),
314 ),
315 drasi_core::models::ElementValue::Null,
316 ),
317 );
318 property_map.insert(
319 "humidity",
320 crate::conversion::json_to_element_value_or_default(
321 &Value::Number(
322 serde_json::Number::from_f64(
323 40.0 + rand::random::<f64>() * 20.0,
324 )
325 .unwrap_or(serde_json::Number::from(50)),
326 ),
327 drasi_core::models::ElementValue::Null,
328 ),
329 );
330 property_map.insert(
331 "timestamp",
332 crate::conversion::json_to_element_value_or_default(
333 &Value::String(chrono::Utc::now().to_rfc3339()),
334 drasi_core::models::ElementValue::Null,
335 ),
336 );
337
338 let metadata = ElementMetadata {
339 reference,
340 labels: Arc::from(vec![Arc::from("SensorReading")]),
341 effective_from: crate::time::get_system_time_millis()
342 .unwrap_or_else(|e| {
343 log::warn!("Failed to get timestamp for mock sensor: {e}");
344 chrono::Utc::now().timestamp_millis() as u64
345 }),
346 };
347
348 let element = Element::Node {
349 metadata,
350 properties: property_map,
351 };
352
353 // Determine if this is a new sensor (Insert) or an update (Update)
354 let is_new = {
355 let mut seen = seen_sensors.write().await;
356 seen.insert(sensor_id)
357 };
358
359 if is_new {
360 SourceChange::Insert { element }
361 } else {
362 SourceChange::Update { element }
363 }
364 }
365 DataType::Generic => {
366 // Generic data
367 let element_id = format!("generic_{seq}");
368 let reference = ElementReference::new(&source_name, &element_id);
369
370 let mut property_map = ElementPropertyMap::new();
371 property_map.insert(
372 "value",
373 crate::conversion::json_to_element_value_or_default(
374 &Value::Number(rand::random::<i32>().into()),
375 drasi_core::models::ElementValue::Null,
376 ),
377 );
378 property_map.insert(
379 "message",
380 crate::conversion::json_to_element_value_or_default(
381 &Value::String("Generic mock data".to_string()),
382 drasi_core::models::ElementValue::Null,
383 ),
384 );
385 property_map.insert(
386 "timestamp",
387 crate::conversion::json_to_element_value_or_default(
388 &Value::String(chrono::Utc::now().to_rfc3339()),
389 drasi_core::models::ElementValue::Null,
390 ),
391 );
392
393 let metadata = ElementMetadata {
394 reference,
395 labels: Arc::from(vec![Arc::from("Generic")]),
396 effective_from: crate::time::get_system_time_millis()
397 .unwrap_or_else(|e| {
398 log::warn!("Failed to get timestamp for mock generic: {e}");
399 chrono::Utc::now().timestamp_millis() as u64
400 }),
401 };
402
403 let element = Element::Node {
404 metadata,
405 properties: property_map,
406 };
407
408 SourceChange::Insert { element }
409 }
410 };
411
412 // Create profiling metadata with timestamps
413 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
414 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
415
416 let wrapper = SourceEventWrapper::with_profiling(
417 source_id.clone(),
418 SourceEvent::Change(source_change),
419 chrono::Utc::now(),
420 profiling,
421 );
422
423 // Dispatch to all subscribers via helper
424 if let Err(e) = SourceBase::dispatch_from_task(
425 base_dispatchers.clone(),
426 wrapper,
427 &source_id,
428 )
429 .await
430 {
431 debug!("Failed to dispatch change: {e}");
432 }
433 }
434
435 info!("Mock source task completed");
436 }
437 .instrument(span),
438 );
439
440 *self.base.task_handle.write().await = Some(task);
441
442 Ok(())
443 }
444
445 async fn stop(&self) -> Result<()> {
446 log_component_stop("Mock Source", &self.base.id);
447
448 self.base
449 .set_status(
450 ComponentStatus::Stopping,
451 Some("Stopping mock source".to_string()),
452 )
453 .await;
454
455 // Cancel the task
456 if let Some(handle) = self.base.task_handle.write().await.take() {
457 handle.abort();
458 let _ = handle.await;
459 }
460
461 self.base
462 .set_status(
463 ComponentStatus::Stopped,
464 Some("Mock source stopped successfully".to_string()),
465 )
466 .await;
467
468 Ok(())
469 }
470
471 async fn status(&self) -> ComponentStatus {
472 self.base.get_status().await
473 }
474
475 async fn subscribe(
476 &self,
477 settings: drasi_lib::config::SourceSubscriptionSettings,
478 ) -> Result<SubscriptionResponse> {
479 self.base.subscribe_with_bootstrap(&settings, "Mock").await
480 }
481
482 fn as_any(&self) -> &dyn std::any::Any {
483 self
484 }
485
486 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
487 self.base.initialize(context).await;
488 }
489
490 async fn set_bootstrap_provider(
491 &self,
492 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
493 ) {
494 self.base.set_bootstrap_provider(provider).await;
495 }
496}
497
498impl MockSource {
499 /// Injects a custom event into the source for testing purposes.
500 ///
501 /// This allows tests to send specific [`SourceChange`] events (INSERT, UPDATE, DELETE)
502 /// without waiting for automatic generation. Useful for deterministic testing scenarios.
503 ///
504 /// The source does not need to be started to inject events.
505 ///
506 /// # Arguments
507 ///
508 /// * `change` - The [`SourceChange`] to inject (e.g., `SourceChange::Insert { element }`)
509 ///
510 /// # Errors
511 ///
512 /// Returns [`anyhow::Error`] if dispatching fails (e.g., all receivers have been dropped).
513 pub async fn inject_event(&self, change: SourceChange) -> Result<()> {
514 self.base.dispatch_source_change(change).await
515 }
516
517 /// Creates a test subscription to receive events from this source.
518 ///
519 /// This bypasses DrasiLib's subscription mechanism and directly subscribes to
520 /// the source's event dispatcher. Useful for unit testing the source in isolation.
521 ///
522 /// # Returns
523 ///
524 /// A boxed receiver that yields [`SourceEventWrapper`](drasi_lib::channels::SourceEventWrapper)
525 /// for each event generated or injected.
526 ///
527 /// # Example
528 ///
529 /// ```rust,ignore
530 /// let source = MockSource::new("test", config)?;
531 /// let mut rx = source.test_subscribe();
532 ///
533 /// source.start().await?;
534 ///
535 /// // Receive events
536 /// while let Some(event) = rx.recv().await {
537 /// println!("Received: {:?}", event);
538 /// }
539 /// ```
540 pub fn test_subscribe(
541 &self,
542 ) -> Box<dyn drasi_lib::channels::ChangeReceiver<drasi_lib::channels::SourceEventWrapper>> {
543 self.base.test_subscribe()
544 }
545}
546
547/// Builder for [`MockSource`] instances.
548///
549/// Provides a fluent API for constructing mock sources with sensible defaults.
550/// This is the recommended way to create a `MockSource`.
551///
552/// # Defaults
553///
554/// | Option | Default |
555/// |--------|---------|
556/// | `data_type` | [`DataType::Generic`] |
557/// | `interval_ms` | 5000 |
558/// | `dispatch_mode` | `Channel` |
559/// | `dispatch_buffer_capacity` | 1000 |
560/// | `auto_start` | `true` |
561///
562/// # Example
563///
564/// ```rust,ignore
565/// use drasi_source_mock::{MockSource, DataType};
566///
567/// let source = MockSource::builder("my-source")
568/// .with_data_type(DataType::sensor_reading(10))
569/// .with_interval_ms(1000)
570/// .with_auto_start(false) // Don't start automatically
571/// .build()?;
572/// ```
573pub struct MockSourceBuilder {
574 id: String,
575 data_type: DataType,
576 interval_ms: u64,
577 dispatch_mode: Option<DispatchMode>,
578 dispatch_buffer_capacity: Option<usize>,
579 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
580 auto_start: bool,
581}
582
583impl MockSourceBuilder {
584 /// Create a new builder with the given source ID.
585 ///
586 /// # Arguments
587 ///
588 /// * `id` - Unique identifier for the source instance
589 pub fn new(id: impl Into<String>) -> Self {
590 Self {
591 id: id.into(),
592 data_type: DataType::Generic,
593 interval_ms: 5000,
594 dispatch_mode: None,
595 dispatch_buffer_capacity: None,
596 bootstrap_provider: None,
597 auto_start: true,
598 }
599 }
600
601 /// Set the data type to generate.
602 ///
603 /// # Arguments
604 ///
605 /// * `data_type` - One of: `DataType::Counter`, `DataType::SensorReading { sensor_count }`, or `DataType::Generic` (default)
606 ///
607 /// For SensorReading, use `DataType::sensor_reading(count)` helper method.
608 pub fn with_data_type(mut self, data_type: DataType) -> Self {
609 self.data_type = data_type;
610 self
611 }
612
613 /// Set the generation interval in milliseconds.
614 ///
615 /// # Arguments
616 ///
617 /// * `interval_ms` - Interval between data generation (default: 5000)
618 pub fn with_interval_ms(mut self, interval_ms: u64) -> Self {
619 self.interval_ms = interval_ms;
620 self
621 }
622
623 /// Set the dispatch mode for event routing.
624 ///
625 /// # Arguments
626 ///
627 /// * `mode` - `Channel` (default, with backpressure) or `Broadcast`
628 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
629 self.dispatch_mode = Some(mode);
630 self
631 }
632
633 /// Set the dispatch buffer capacity.
634 ///
635 /// # Arguments
636 ///
637 /// * `capacity` - Buffer size for dispatch channels (default: 1000)
638 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
639 self.dispatch_buffer_capacity = Some(capacity);
640 self
641 }
642
643 /// Set the bootstrap provider for initial data delivery.
644 ///
645 /// # Arguments
646 ///
647 /// * `provider` - Bootstrap provider implementation
648 pub fn with_bootstrap_provider(
649 mut self,
650 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
651 ) -> Self {
652 self.bootstrap_provider = Some(Box::new(provider));
653 self
654 }
655
656 /// Set whether this source should auto-start when DrasiLib starts.
657 ///
658 /// Default is `true`. Set to `false` if this source should only be
659 /// started manually via `start_source()`.
660 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
661 self.auto_start = auto_start;
662 self
663 }
664
665 /// Build the MockSource instance.
666 ///
667 /// # Returns
668 ///
669 /// A fully constructed `MockSource`, or an error if construction fails.
670 ///
671 /// # Errors
672 ///
673 /// Returns an error if:
674 /// - The base source cannot be initialized
675 /// - The configuration is invalid (e.g., interval_ms is 0, sensor_count is 0)
676 pub fn build(self) -> Result<MockSource> {
677 let config = MockSourceConfig {
678 data_type: self.data_type,
679 interval_ms: self.interval_ms,
680 };
681
682 config.validate()?;
683
684 // Build SourceBaseParams with all settings
685 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
686 if let Some(mode) = self.dispatch_mode {
687 params = params.with_dispatch_mode(mode);
688 }
689 if let Some(capacity) = self.dispatch_buffer_capacity {
690 params = params.with_dispatch_buffer_capacity(capacity);
691 }
692 if let Some(provider) = self.bootstrap_provider {
693 params = params.with_bootstrap_provider(provider);
694 }
695
696 Ok(MockSource {
697 base: SourceBase::new(params)?,
698 config,
699 seen_sensors: Arc::new(RwLock::new(HashSet::new())),
700 })
701 }
702}
703
704impl MockSource {
705 /// Create a builder for MockSource with the given ID.
706 ///
707 /// This is the recommended way to construct a MockSource.
708 ///
709 /// # Arguments
710 ///
711 /// * `id` - Unique identifier for the source instance
712 ///
713 /// # Example
714 ///
715 /// ```rust,ignore
716 /// let source = MockSource::builder("my-source")
717 /// .with_data_type(DataType::sensor_reading(10))
718 /// .with_interval_ms(1000)
719 /// .build()?;
720 /// ```
721 pub fn builder(id: impl Into<String>) -> MockSourceBuilder {
722 MockSourceBuilder::new(id)
723 }
724}