Skip to main content

drasi_source_application/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Application Source Plugin for Drasi
16//!
17//! This plugin enables programmatic event injection into Drasi's continuous query
18//! processing pipeline. Unlike other sources that connect to external data systems,
19//! the application source allows your Rust code to directly send graph data changes.
20//!
21//! # Architecture
22//!
23//! The application source uses a handle-based pattern:
24//! - **`ApplicationSource`**: The source component that processes events
25//! - **`ApplicationSourceHandle`**: A clonable handle for sending events from anywhere in your code
26//!
27//! # API Overview
28//!
29//! The `ApplicationSourceHandle` provides high-level methods for common operations:
30//!
31//! - [`send_node_insert`](ApplicationSourceHandle::send_node_insert) - Insert a new node
32//! - [`send_node_update`](ApplicationSourceHandle::send_node_update) - Update an existing node
33//! - [`send_delete`](ApplicationSourceHandle::send_delete) - Delete a node or relation
34//! - [`send_relation_insert`](ApplicationSourceHandle::send_relation_insert) - Insert a relationship
35//! - [`send_batch`](ApplicationSourceHandle::send_batch) - Send multiple changes efficiently
36//! - [`send`](ApplicationSourceHandle::send) - Send a raw `SourceChange` event
37//!
38//! # Building Properties
39//!
40//! Use the [`PropertyMapBuilder`] to construct property maps fluently:
41//!
42//! ```rust,ignore
43//! use drasi_source_application::PropertyMapBuilder;
44//!
45//! let props = PropertyMapBuilder::new()
46//!     .string("name", "Alice")
47//!     .integer("age", 30)
48//!     .float("score", 95.5)
49//!     .bool("active", true)
50//!     .build();
51//! ```
52//!
53//! # Configuration
54//!
55//! The application source has minimal configuration since it receives events programmatically
56//! rather than connecting to an external system.
57//!
58//! | Field | Type | Default | Description |
59//! |-------|------|---------|-------------|
60//! | `properties` | object | `{}` | Custom properties (passed through to `properties()`) |
61//!
62//! # Bootstrap Support
63//!
64//! The application source supports pluggable bootstrap providers via the `BootstrapProvider`
65//! trait. Configure a bootstrap provider using `set_bootstrap_provider()` or through the
66//! builder pattern. Common options include `ApplicationBootstrapProvider` for replaying
67//! stored events, or any other `BootstrapProvider` implementation.
68//!
69//! # Example Configuration (YAML)
70//!
71//! ```yaml
72//! source_type: application
73//! properties: {}
74//! ```
75//!
76//! # Usage Example
77//!
78//! ```rust,ignore
79//! use drasi_source_application::{
80//!     ApplicationSource, ApplicationSourceConfig, ApplicationSourceHandle,
81//!     PropertyMapBuilder
82//! };
83//! use std::sync::Arc;
84//!
85//! // Create the source and handle
86//! let config = ApplicationSourceConfig::default();
87//! let (source, handle) = ApplicationSource::new("my-app-source", config)?;
88//!
89//! // Add source to Drasi
90//! let source = Arc::new(source);
91//! drasi.add_source(source).await?;
92//!
93//! // Clone handle for use in different parts of your application
94//! let handle_clone = handle.clone();
95//!
96//! // Insert a node
97//! let props = PropertyMapBuilder::new()
98//!     .string("name", "Alice")
99//!     .integer("age", 30)
100//!     .build();
101//!
102//! handle.send_node_insert("user-1", vec!["User"], props).await?;
103//!
104//! // Insert a relationship
105//! let rel_props = PropertyMapBuilder::new()
106//!     .string("since", "2024-01-01")
107//!     .build();
108//!
109//! handle.send_relation_insert(
110//!     "follows-1",
111//!     vec!["FOLLOWS"],
112//!     rel_props,
113//!     "user-1",  // start node
114//!     "user-2",  // end node
115//! ).await?;
116//!
117//! // Update a node
118//! let updated_props = PropertyMapBuilder::new()
119//!     .integer("age", 31)
120//!     .build();
121//!
122//! handle.send_node_update("user-1", vec!["User"], updated_props).await?;
123//!
124//! // Delete a node
125//! handle.send_delete("user-1", vec!["User"]).await?;
126//! ```
127//!
128//! # Use Cases
129//!
130//! - **Testing**: Inject test data directly without setting up external sources
131//! - **Integration**: Bridge between your application logic and Drasi queries
132//! - **Simulation**: Generate synthetic events for development and demos
133//! - **Hybrid Sources**: Combine with other sources for complex data pipelines
134
135pub mod config;
136pub use config::ApplicationSourceConfig;
137
138mod property_builder;
139mod time;
140
141#[cfg(test)]
142mod tests;
143
144pub use property_builder::PropertyMapBuilder;
145
146use anyhow::Result;
147use async_trait::async_trait;
148use log::{debug, info, warn};
149use std::collections::HashMap;
150use std::sync::Arc;
151use tokio::sync::{mpsc, RwLock};
152
153use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
154use drasi_lib::channels::{ComponentEventSender, ComponentStatus, ComponentType, *};
155use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
156use drasi_lib::Source;
157use tracing::Instrument;
158
159/// Handle for programmatic event injection into an Application Source
160///
161/// `ApplicationSourceHandle` provides a type-safe API for injecting graph data changes
162/// (node inserts, updates, deletes, and relationship inserts) directly from your application
163/// code into the Drasi continuous query processing pipeline.
164#[derive(Clone)]
165pub struct ApplicationSourceHandle {
166    tx: mpsc::Sender<SourceChange>,
167    source_id: String,
168}
169
170impl ApplicationSourceHandle {
171    /// Send a raw source change event
172    pub async fn send(&self, change: SourceChange) -> Result<()> {
173        self.tx
174            .send(change)
175            .await
176            .map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
177        Ok(())
178    }
179
180    /// Insert a new node into the graph
181    pub async fn send_node_insert(
182        &self,
183        element_id: impl Into<Arc<str>>,
184        labels: Vec<impl Into<Arc<str>>>,
185        properties: drasi_core::models::ElementPropertyMap,
186    ) -> Result<()> {
187        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
188            warn!("Failed to get timestamp for node insert: {e}, using fallback");
189            chrono::Utc::now().timestamp_millis() as u64
190        });
191
192        let element = Element::Node {
193            metadata: ElementMetadata {
194                reference: ElementReference {
195                    source_id: Arc::from(self.source_id.as_str()),
196                    element_id: element_id.into(),
197                },
198                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
199                effective_from,
200            },
201            properties,
202        };
203
204        self.send(SourceChange::Insert { element }).await
205    }
206
207    /// Update an existing node in the graph
208    pub async fn send_node_update(
209        &self,
210        element_id: impl Into<Arc<str>>,
211        labels: Vec<impl Into<Arc<str>>>,
212        properties: drasi_core::models::ElementPropertyMap,
213    ) -> Result<()> {
214        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
215            warn!("Failed to get timestamp for node update: {e}, using fallback");
216            chrono::Utc::now().timestamp_millis() as u64
217        });
218
219        let element = Element::Node {
220            metadata: ElementMetadata {
221                reference: ElementReference {
222                    source_id: Arc::from(self.source_id.as_str()),
223                    element_id: element_id.into(),
224                },
225                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
226                effective_from,
227            },
228            properties,
229        };
230
231        self.send(SourceChange::Update { element }).await
232    }
233
234    /// Delete a node or relationship from the graph
235    pub async fn send_delete(
236        &self,
237        element_id: impl Into<Arc<str>>,
238        labels: Vec<impl Into<Arc<str>>>,
239    ) -> Result<()> {
240        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
241            warn!("Failed to get timestamp for delete: {e}, using fallback");
242            chrono::Utc::now().timestamp_millis() as u64
243        });
244
245        let metadata = ElementMetadata {
246            reference: ElementReference {
247                source_id: Arc::from(self.source_id.as_str()),
248                element_id: element_id.into(),
249            },
250            labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
251            effective_from,
252        };
253
254        self.send(SourceChange::Delete { metadata }).await
255    }
256
257    /// Insert a new relationship into the graph
258    pub async fn send_relation_insert(
259        &self,
260        element_id: impl Into<Arc<str>>,
261        labels: Vec<impl Into<Arc<str>>>,
262        properties: drasi_core::models::ElementPropertyMap,
263        start_node_id: impl Into<Arc<str>>,
264        end_node_id: impl Into<Arc<str>>,
265    ) -> Result<()> {
266        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
267            warn!("Failed to get timestamp for relation insert: {e}, using fallback");
268            chrono::Utc::now().timestamp_millis() as u64
269        });
270
271        let element = Element::Relation {
272            metadata: ElementMetadata {
273                reference: ElementReference {
274                    source_id: Arc::from(self.source_id.as_str()),
275                    element_id: element_id.into(),
276                },
277                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
278                effective_from,
279            },
280            properties,
281            in_node: ElementReference {
282                source_id: Arc::from(self.source_id.as_str()),
283                element_id: end_node_id.into(),
284            },
285            out_node: ElementReference {
286                source_id: Arc::from(self.source_id.as_str()),
287                element_id: start_node_id.into(),
288            },
289        };
290
291        self.send(SourceChange::Insert { element }).await
292    }
293
294    /// Send a batch of source changes efficiently
295    pub async fn send_batch(&self, changes: Vec<SourceChange>) -> Result<()> {
296        for change in changes {
297            self.send(change).await?;
298        }
299        Ok(())
300    }
301
302    /// Get the source ID that this handle is connected to
303    pub fn source_id(&self) -> &str {
304        &self.source_id
305    }
306}
307
308/// A source that allows applications to programmatically inject events.
309///
310/// This source receives events from an [`ApplicationSourceHandle`] and forwards
311/// them to the Drasi query processing pipeline.
312///
313/// # Fields
314///
315/// - `base`: Common source functionality (dispatchers, status, lifecycle, bootstrap)
316/// - `config`: Application source configuration
317/// - `app_rx`: Receiver for events from the handle
318/// - `app_tx`: Sender for creating additional handles
319pub struct ApplicationSource {
320    /// Base source implementation providing common functionality
321    base: SourceBase,
322    /// Application source configuration
323    config: ApplicationSourceConfig,
324    /// Receiver for events from handles (taken when processing starts)
325    app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
326    /// Sender for creating new handles
327    app_tx: mpsc::Sender<SourceChange>,
328}
329
330impl ApplicationSource {
331    /// Create a new application source and its handle.
332    ///
333    /// The event channel is automatically injected when the source is added
334    /// to DrasiLib via `add_source()`.
335    ///
336    /// # Arguments
337    ///
338    /// * `id` - Unique identifier for this source instance
339    /// * `config` - Application source configuration
340    ///
341    /// # Returns
342    ///
343    /// A tuple of `(ApplicationSource, ApplicationSourceHandle)` where the handle
344    /// can be used to send events to the source.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the base source cannot be initialized.
349    ///
350    /// # Example
351    ///
352    /// ```rust,ignore
353    /// use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
354    ///
355    /// let config = ApplicationSourceConfig::default();
356    /// let (source, handle) = ApplicationSource::new("my-source", config)?;
357    /// ```
358    pub fn new(
359        id: impl Into<String>,
360        config: ApplicationSourceConfig,
361    ) -> Result<(Self, ApplicationSourceHandle)> {
362        let id = id.into();
363        let params = SourceBaseParams::new(id.clone());
364        let (app_tx, app_rx) = mpsc::channel(1000);
365
366        let handle = ApplicationSourceHandle {
367            tx: app_tx.clone(),
368            source_id: id.clone(),
369        };
370
371        let source = Self {
372            base: SourceBase::new(params)?,
373            config,
374            app_rx: Arc::new(RwLock::new(Some(app_rx))),
375            app_tx,
376        };
377
378        Ok((source, handle))
379    }
380
381    /// Get a new handle for this source
382    pub fn get_handle(&self) -> ApplicationSourceHandle {
383        ApplicationSourceHandle {
384            tx: self.app_tx.clone(),
385            source_id: self.base.id.clone(),
386        }
387    }
388
389    async fn process_events(&self) -> Result<()> {
390        let mut rx = self
391            .app_rx
392            .write()
393            .await
394            .take()
395            .ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
396
397        let source_name = self.base.id.clone();
398        let base_dispatchers = self.base.dispatchers.clone();
399        let status_tx = self.base.status_tx();
400        let status = self.base.status.clone();
401        let source_id = self.base.id.clone();
402
403        // Get instance_id from context for log routing isolation
404        let instance_id = self
405            .base
406            .context()
407            .await
408            .map(|c| c.instance_id)
409            .unwrap_or_default();
410
411        let span = tracing::info_span!(
412            "application_source_processor",
413            instance_id = %instance_id,
414            component_id = %source_id,
415            component_type = "source"
416        );
417        let handle = tokio::spawn(
418            async move {
419                info!("ApplicationSource '{source_name}' event processor started");
420
421                if let Some(ref tx) = *status_tx.read().await {
422                    let _ = tx
423                        .send(ComponentEvent {
424                            component_id: source_name.clone(),
425                            component_type: ComponentType::Source,
426                            status: ComponentStatus::Running,
427                            timestamp: chrono::Utc::now(),
428                            message: Some("Processing events".to_string()),
429                        })
430                        .await;
431                }
432
433                *status.write().await = ComponentStatus::Running;
434
435                while let Some(change) = rx.recv().await {
436                    debug!("ApplicationSource '{source_name}' received event: {change:?}");
437
438                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
439                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
440
441                    let wrapper = SourceEventWrapper::with_profiling(
442                        source_name.clone(),
443                        SourceEvent::Change(change),
444                        chrono::Utc::now(),
445                        profiling,
446                    );
447
448                    if let Err(e) = SourceBase::dispatch_from_task(
449                        base_dispatchers.clone(),
450                        wrapper,
451                        &source_name,
452                    )
453                    .await
454                    {
455                        debug!("Failed to dispatch change (no subscribers): {e}");
456                    }
457                }
458
459                info!("ApplicationSource '{source_name}' event processor stopped");
460            }
461            .instrument(span),
462        );
463
464        *self.base.task_handle.write().await = Some(handle);
465        Ok(())
466    }
467}
468
469#[async_trait]
470impl Source for ApplicationSource {
471    fn id(&self) -> &str {
472        &self.base.id
473    }
474
475    fn type_name(&self) -> &str {
476        "application"
477    }
478
479    fn properties(&self) -> HashMap<String, serde_json::Value> {
480        self.config.properties.clone()
481    }
482
483    fn auto_start(&self) -> bool {
484        self.base.get_auto_start()
485    }
486
487    async fn start(&self) -> Result<()> {
488        info!("Starting ApplicationSource '{}'", self.base.id);
489
490        self.base
491            .set_status_with_event(
492                ComponentStatus::Starting,
493                Some("Starting application source".to_string()),
494            )
495            .await?;
496
497        self.process_events().await?;
498
499        Ok(())
500    }
501
502    async fn stop(&self) -> Result<()> {
503        info!("Stopping ApplicationSource '{}'", self.base.id);
504
505        self.base
506            .set_status_with_event(
507                ComponentStatus::Stopping,
508                Some("Stopping application source".to_string()),
509            )
510            .await?;
511
512        if let Some(handle) = self.base.task_handle.write().await.take() {
513            handle.abort();
514        }
515
516        self.base
517            .set_status_with_event(
518                ComponentStatus::Stopped,
519                Some("Application source stopped".to_string()),
520            )
521            .await?;
522
523        Ok(())
524    }
525
526    async fn status(&self) -> ComponentStatus {
527        self.base.status.read().await.clone()
528    }
529
530    async fn subscribe(
531        &self,
532        settings: drasi_lib::config::SourceSubscriptionSettings,
533    ) -> Result<SubscriptionResponse> {
534        self.base
535            .subscribe_with_bootstrap(&settings, "Application")
536            .await
537    }
538
539    fn as_any(&self) -> &dyn std::any::Any {
540        self
541    }
542
543    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
544        self.base.initialize(context).await;
545    }
546
547    async fn set_bootstrap_provider(
548        &self,
549        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
550    ) {
551        self.base.set_bootstrap_provider(provider).await;
552    }
553}