Skip to main content

drasi_source_application/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Application Source Plugin for Drasi
17//!
18//! This plugin enables programmatic event injection into Drasi's continuous query
19//! processing pipeline. Unlike other sources that connect to external data systems,
20//! the application source allows your Rust code to directly send graph data changes.
21//!
22//! # Architecture
23//!
24//! The application source uses a handle-based pattern:
25//! - **`ApplicationSource`**: The source component that processes events
26//! - **`ApplicationSourceHandle`**: A clonable handle for sending events from anywhere in your code
27//!
28//! # API Overview
29//!
30//! The `ApplicationSourceHandle` provides high-level methods for common operations:
31//!
32//! - [`send_node_insert`](ApplicationSourceHandle::send_node_insert) - Insert a new node
33//! - [`send_node_update`](ApplicationSourceHandle::send_node_update) - Update an existing node
34//! - [`send_delete`](ApplicationSourceHandle::send_delete) - Delete a node or relation
35//! - [`send_relation_insert`](ApplicationSourceHandle::send_relation_insert) - Insert a relationship
36//! - [`send_batch`](ApplicationSourceHandle::send_batch) - Send multiple changes efficiently
37//! - [`send`](ApplicationSourceHandle::send) - Send a raw `SourceChange` event
38//!
39//! # Building Properties
40//!
41//! Use the [`PropertyMapBuilder`] to construct property maps fluently:
42//!
43//! ```rust,ignore
44//! use drasi_source_application::PropertyMapBuilder;
45//!
46//! let props = PropertyMapBuilder::new()
47//!     .string("name", "Alice")
48//!     .integer("age", 30)
49//!     .float("score", 95.5)
50//!     .bool("active", true)
51//!     .build();
52//! ```
53//!
54//! # Configuration
55//!
56//! The application source has minimal configuration since it receives events programmatically
57//! rather than connecting to an external system.
58//!
59//! | Field | Type | Default | Description |
60//! |-------|------|---------|-------------|
61//! | `properties` | object | `{}` | Custom properties (passed through to `properties()`) |
62//!
63//! # Bootstrap Support
64//!
65//! The application source supports pluggable bootstrap providers via the `BootstrapProvider`
66//! trait. Configure a bootstrap provider using `set_bootstrap_provider()` or through the
67//! builder pattern. Common options include `ApplicationBootstrapProvider` for replaying
68//! stored events, or any other `BootstrapProvider` implementation.
69//!
70//! # Example Configuration (YAML)
71//!
72//! ```yaml
73//! source_type: application
74//! properties: {}
75//! ```
76//!
77//! # Usage Example
78//!
79//! ```rust,ignore
80//! use drasi_source_application::{
81//!     ApplicationSource, ApplicationSourceConfig, ApplicationSourceHandle,
82//!     PropertyMapBuilder
83//! };
84//! use std::sync::Arc;
85//!
86//! // Create the source and handle
87//! let config = ApplicationSourceConfig::default();
88//! let (source, handle) = ApplicationSource::new("my-app-source", config)?;
89//!
90//! // Add source to Drasi
91//! let source = Arc::new(source);
92//! drasi.add_source(source).await?;
93//!
94//! // Clone handle for use in different parts of your application
95//! let handle_clone = handle.clone();
96//!
97//! // Insert a node
98//! let props = PropertyMapBuilder::new()
99//!     .string("name", "Alice")
100//!     .integer("age", 30)
101//!     .build();
102//!
103//! handle.send_node_insert("user-1", vec!["User"], props).await?;
104//!
105//! // Insert a relationship
106//! let rel_props = PropertyMapBuilder::new()
107//!     .string("since", "2024-01-01")
108//!     .build();
109//!
110//! handle.send_relation_insert(
111//!     "follows-1",
112//!     vec!["FOLLOWS"],
113//!     rel_props,
114//!     "user-1",  // start node
115//!     "user-2",  // end node
116//! ).await?;
117//!
118//! // Update a node
119//! let updated_props = PropertyMapBuilder::new()
120//!     .integer("age", 31)
121//!     .build();
122//!
123//! handle.send_node_update("user-1", vec!["User"], updated_props).await?;
124//!
125//! // Delete a node
126//! handle.send_delete("user-1", vec!["User"]).await?;
127//! ```
128//!
129//! # Use Cases
130//!
131//! - **Testing**: Inject test data directly without setting up external sources
132//! - **Integration**: Bridge between your application logic and Drasi queries
133//! - **Simulation**: Generate synthetic events for development and demos
134//! - **Hybrid Sources**: Combine with other sources for complex data pipelines
135
136pub mod config;
137pub use config::ApplicationSourceConfig;
138
139mod property_builder;
140mod time;
141
142#[cfg(test)]
143mod tests;
144
145pub use property_builder::PropertyMapBuilder;
146
147use anyhow::Result;
148use async_trait::async_trait;
149use log::{debug, info, warn};
150use std::collections::HashMap;
151use std::sync::Arc;
152use tokio::sync::{mpsc, RwLock};
153
154use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
155use drasi_lib::channels::{ComponentStatus, ComponentType, *};
156use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
157use drasi_lib::Source;
158use tracing::Instrument;
159
160/// Handle for programmatic event injection into an Application Source
161///
162/// `ApplicationSourceHandle` provides a type-safe API for injecting graph data changes
163/// (node inserts, updates, deletes, and relationship inserts) directly from your application
164/// code into the Drasi continuous query processing pipeline.
165#[derive(Clone)]
166pub struct ApplicationSourceHandle {
167    tx: mpsc::Sender<SourceChange>,
168    source_id: String,
169}
170
171impl ApplicationSourceHandle {
172    /// Send a raw source change event
173    pub async fn send(&self, change: SourceChange) -> Result<()> {
174        self.tx
175            .send(change)
176            .await
177            .map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
178        Ok(())
179    }
180
181    /// Insert a new node into the graph
182    pub async fn send_node_insert(
183        &self,
184        element_id: impl Into<Arc<str>>,
185        labels: Vec<impl Into<Arc<str>>>,
186        properties: drasi_core::models::ElementPropertyMap,
187    ) -> Result<()> {
188        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
189            warn!("Failed to get timestamp for node insert: {e}, using fallback");
190            chrono::Utc::now().timestamp_millis() as u64
191        });
192
193        let element = Element::Node {
194            metadata: ElementMetadata {
195                reference: ElementReference {
196                    source_id: Arc::from(self.source_id.as_str()),
197                    element_id: element_id.into(),
198                },
199                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
200                effective_from,
201            },
202            properties,
203        };
204
205        self.send(SourceChange::Insert { element }).await
206    }
207
208    /// Update an existing node in the graph
209    pub async fn send_node_update(
210        &self,
211        element_id: impl Into<Arc<str>>,
212        labels: Vec<impl Into<Arc<str>>>,
213        properties: drasi_core::models::ElementPropertyMap,
214    ) -> Result<()> {
215        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
216            warn!("Failed to get timestamp for node update: {e}, using fallback");
217            chrono::Utc::now().timestamp_millis() as u64
218        });
219
220        let element = Element::Node {
221            metadata: ElementMetadata {
222                reference: ElementReference {
223                    source_id: Arc::from(self.source_id.as_str()),
224                    element_id: element_id.into(),
225                },
226                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
227                effective_from,
228            },
229            properties,
230        };
231
232        self.send(SourceChange::Update { element }).await
233    }
234
235    /// Delete a node or relationship from the graph
236    pub async fn send_delete(
237        &self,
238        element_id: impl Into<Arc<str>>,
239        labels: Vec<impl Into<Arc<str>>>,
240    ) -> Result<()> {
241        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
242            warn!("Failed to get timestamp for delete: {e}, using fallback");
243            chrono::Utc::now().timestamp_millis() as u64
244        });
245
246        let metadata = ElementMetadata {
247            reference: ElementReference {
248                source_id: Arc::from(self.source_id.as_str()),
249                element_id: element_id.into(),
250            },
251            labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
252            effective_from,
253        };
254
255        self.send(SourceChange::Delete { metadata }).await
256    }
257
258    /// Insert a new relationship into the graph
259    pub async fn send_relation_insert(
260        &self,
261        element_id: impl Into<Arc<str>>,
262        labels: Vec<impl Into<Arc<str>>>,
263        properties: drasi_core::models::ElementPropertyMap,
264        start_node_id: impl Into<Arc<str>>,
265        end_node_id: impl Into<Arc<str>>,
266    ) -> Result<()> {
267        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
268            warn!("Failed to get timestamp for relation insert: {e}, using fallback");
269            chrono::Utc::now().timestamp_millis() as u64
270        });
271
272        let element = Element::Relation {
273            metadata: ElementMetadata {
274                reference: ElementReference {
275                    source_id: Arc::from(self.source_id.as_str()),
276                    element_id: element_id.into(),
277                },
278                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
279                effective_from,
280            },
281            properties,
282            in_node: ElementReference {
283                source_id: Arc::from(self.source_id.as_str()),
284                element_id: start_node_id.into(),
285            },
286            out_node: ElementReference {
287                source_id: Arc::from(self.source_id.as_str()),
288                element_id: end_node_id.into(),
289            },
290        };
291
292        self.send(SourceChange::Insert { element }).await
293    }
294
295    /// Send a batch of source changes efficiently
296    pub async fn send_batch(&self, changes: Vec<SourceChange>) -> Result<()> {
297        for change in changes {
298            self.send(change).await?;
299        }
300        Ok(())
301    }
302
303    /// Get the source ID that this handle is connected to
304    pub fn source_id(&self) -> &str {
305        &self.source_id
306    }
307}
308
309/// A source that allows applications to programmatically inject events.
310///
311/// This source receives events from an [`ApplicationSourceHandle`] and forwards
312/// them to the Drasi query processing pipeline.
313///
314/// # Fields
315///
316/// - `base`: Common source functionality (dispatchers, status, lifecycle, bootstrap)
317/// - `config`: Application source configuration
318/// - `app_rx`: Receiver for events from the handle
319/// - `app_tx`: Sender for creating additional handles
320pub struct ApplicationSource {
321    /// Base source implementation providing common functionality
322    base: SourceBase,
323    /// Application source configuration
324    config: ApplicationSourceConfig,
325    /// Receiver for events from handles (taken when processing starts)
326    app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
327    /// Sender for creating new handles
328    app_tx: mpsc::Sender<SourceChange>,
329}
330
331impl ApplicationSource {
332    /// Create a new application source and its handle.
333    ///
334    /// The event channel is automatically injected when the source is added
335    /// to DrasiLib via `add_source()`.
336    ///
337    /// # Arguments
338    ///
339    /// * `id` - Unique identifier for this source instance
340    /// * `config` - Application source configuration
341    ///
342    /// # Returns
343    ///
344    /// A tuple of `(ApplicationSource, ApplicationSourceHandle)` where the handle
345    /// can be used to send events to the source.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if the base source cannot be initialized.
350    ///
351    /// # Example
352    ///
353    /// ```rust,ignore
354    /// use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
355    ///
356    /// let config = ApplicationSourceConfig::default();
357    /// let (source, handle) = ApplicationSource::new("my-source", config)?;
358    /// ```
359    pub fn new(
360        id: impl Into<String>,
361        config: ApplicationSourceConfig,
362    ) -> Result<(Self, ApplicationSourceHandle)> {
363        let id = id.into();
364        let params = SourceBaseParams::new(id.clone());
365        let (app_tx, app_rx) = mpsc::channel(1000);
366
367        let handle = ApplicationSourceHandle {
368            tx: app_tx.clone(),
369            source_id: id.clone(),
370        };
371
372        let source = Self {
373            base: SourceBase::new(params)?,
374            config,
375            app_rx: Arc::new(RwLock::new(Some(app_rx))),
376            app_tx,
377        };
378
379        Ok((source, handle))
380    }
381
382    /// Get a new handle for this source
383    pub fn get_handle(&self) -> ApplicationSourceHandle {
384        ApplicationSourceHandle {
385            tx: self.app_tx.clone(),
386            source_id: self.base.id.clone(),
387        }
388    }
389
390    async fn process_events(&self) -> Result<()> {
391        let mut rx = self
392            .app_rx
393            .write()
394            .await
395            .take()
396            .ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
397
398        let source_name = self.base.id.clone();
399        let base_dispatchers = self.base.dispatchers.clone();
400        let status_tx = self.base.status_tx();
401        let status = self.base.status.clone();
402        let source_id = self.base.id.clone();
403
404        // Get instance_id from context for log routing isolation
405        let instance_id = self
406            .base
407            .context()
408            .await
409            .map(|c| c.instance_id)
410            .unwrap_or_default();
411
412        let span = tracing::info_span!(
413            "application_source_processor",
414            instance_id = %instance_id,
415            component_id = %source_id,
416            component_type = "source"
417        );
418        let handle = tokio::spawn(
419            async move {
420                info!("ApplicationSource '{source_name}' event processor started");
421
422                if let Some(ref tx) = *status_tx.read().await {
423                    let _ = tx
424                        .send(ComponentEvent {
425                            component_id: source_name.clone(),
426                            component_type: ComponentType::Source,
427                            status: ComponentStatus::Running,
428                            timestamp: chrono::Utc::now(),
429                            message: Some("Processing events".to_string()),
430                        })
431                        .await;
432                }
433
434                *status.write().await = ComponentStatus::Running;
435
436                while let Some(change) = rx.recv().await {
437                    debug!("ApplicationSource '{source_name}' received event: {change:?}");
438
439                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
440                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
441
442                    let wrapper = SourceEventWrapper::with_profiling(
443                        source_name.clone(),
444                        SourceEvent::Change(change),
445                        chrono::Utc::now(),
446                        profiling,
447                    );
448
449                    if let Err(e) = SourceBase::dispatch_from_task(
450                        base_dispatchers.clone(),
451                        wrapper,
452                        &source_name,
453                    )
454                    .await
455                    {
456                        debug!("Failed to dispatch change (no subscribers): {e}");
457                    }
458                }
459
460                info!("ApplicationSource '{source_name}' event processor stopped");
461            }
462            .instrument(span),
463        );
464
465        *self.base.task_handle.write().await = Some(handle);
466        Ok(())
467    }
468}
469
470#[async_trait]
471impl Source for ApplicationSource {
472    fn id(&self) -> &str {
473        &self.base.id
474    }
475
476    fn type_name(&self) -> &str {
477        "application"
478    }
479
480    fn properties(&self) -> HashMap<String, serde_json::Value> {
481        self.config.properties.clone()
482    }
483
484    fn auto_start(&self) -> bool {
485        self.base.get_auto_start()
486    }
487
488    async fn start(&self) -> Result<()> {
489        info!("Starting ApplicationSource '{}'", self.base.id);
490
491        self.base
492            .set_status_with_event(
493                ComponentStatus::Starting,
494                Some("Starting application source".to_string()),
495            )
496            .await?;
497
498        self.process_events().await?;
499
500        Ok(())
501    }
502
503    async fn stop(&self) -> Result<()> {
504        info!("Stopping ApplicationSource '{}'", self.base.id);
505
506        self.base
507            .set_status_with_event(
508                ComponentStatus::Stopping,
509                Some("Stopping application source".to_string()),
510            )
511            .await?;
512
513        if let Some(handle) = self.base.task_handle.write().await.take() {
514            handle.abort();
515        }
516
517        self.base
518            .set_status_with_event(
519                ComponentStatus::Stopped,
520                Some("Application source stopped".to_string()),
521            )
522            .await?;
523
524        Ok(())
525    }
526
527    async fn status(&self) -> ComponentStatus {
528        self.base.status.read().await.clone()
529    }
530
531    async fn subscribe(
532        &self,
533        settings: drasi_lib::config::SourceSubscriptionSettings,
534    ) -> Result<SubscriptionResponse> {
535        self.base
536            .subscribe_with_bootstrap(&settings, "Application")
537            .await
538    }
539
540    fn as_any(&self) -> &dyn std::any::Any {
541        self
542    }
543
544    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
545        self.base.initialize(context).await;
546    }
547
548    async fn set_bootstrap_provider(
549        &self,
550        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
551    ) {
552        self.base.set_bootstrap_provider(provider).await;
553    }
554}