Skip to main content

drasi_source_application/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Application Source Plugin for Drasi
17//!
18//! This plugin enables programmatic event injection into Drasi's continuous query
19//! processing pipeline. Unlike other sources that connect to external data systems,
20//! the application source allows your Rust code to directly send graph data changes.
21//!
22//! # Architecture
23//!
24//! The application source uses a handle-based pattern:
25//! - **`ApplicationSource`**: The source component that processes events
26//! - **`ApplicationSourceHandle`**: A clonable handle for sending events from anywhere in your code
27//!
28//! # API Overview
29//!
30//! The `ApplicationSourceHandle` provides high-level methods for common operations:
31//!
32//! - [`send_node_insert`](ApplicationSourceHandle::send_node_insert) - Insert a new node
33//! - [`send_node_update`](ApplicationSourceHandle::send_node_update) - Update an existing node
34//! - [`send_delete`](ApplicationSourceHandle::send_delete) - Delete a node or relation
35//! - [`send_relation_insert`](ApplicationSourceHandle::send_relation_insert) - Insert a relationship
36//! - [`send_batch`](ApplicationSourceHandle::send_batch) - Send multiple changes efficiently
37//! - [`send`](ApplicationSourceHandle::send) - Send a raw `SourceChange` event
38//!
39//! # Building Properties
40//!
41//! Use the [`PropertyMapBuilder`] to construct property maps fluently:
42//!
43//! ```rust,ignore
44//! use drasi_source_application::PropertyMapBuilder;
45//!
46//! let props = PropertyMapBuilder::new()
47//!     .string("name", "Alice")
48//!     .integer("age", 30)
49//!     .float("score", 95.5)
50//!     .bool("active", true)
51//!     .build();
52//! ```
53//!
54//! # Configuration
55//!
56//! The application source has minimal configuration since it receives events programmatically
57//! rather than connecting to an external system.
58//!
59//! | Field | Type | Default | Description |
60//! |-------|------|---------|-------------|
61//! | `properties` | object | `{}` | Custom properties (passed through to `properties()`) |
62//!
63//! # Bootstrap Support
64//!
65//! The application source supports pluggable bootstrap providers via the `BootstrapProvider`
66//! trait. Configure a bootstrap provider using `set_bootstrap_provider()` or through the
67//! builder pattern. Common options include `ApplicationBootstrapProvider` for replaying
68//! stored events, or any other `BootstrapProvider` implementation.
69//!
70//! # Example Configuration (YAML)
71//!
72//! ```yaml
73//! source_type: application
74//! properties: {}
75//! ```
76//!
77//! # Usage Example
78//!
79//! ```rust,ignore
80//! use drasi_source_application::{
81//!     ApplicationSource, ApplicationSourceConfig, ApplicationSourceHandle,
82//!     PropertyMapBuilder
83//! };
84//! use std::sync::Arc;
85//!
86//! // Create the source and handle
87//! let config = ApplicationSourceConfig::default();
88//! let (source, handle) = ApplicationSource::new("my-app-source", config)?;
89//!
90//! // Add source to Drasi
91//! let source = Arc::new(source);
92//! drasi.add_source(source).await?;
93//!
94//! // Clone handle for use in different parts of your application
95//! let handle_clone = handle.clone();
96//!
97//! // Insert a node
98//! let props = PropertyMapBuilder::new()
99//!     .string("name", "Alice")
100//!     .integer("age", 30)
101//!     .build();
102//!
103//! handle.send_node_insert("user-1", vec!["User"], props).await?;
104//!
105//! // Insert a relationship
106//! let rel_props = PropertyMapBuilder::new()
107//!     .string("since", "2024-01-01")
108//!     .build();
109//!
110//! handle.send_relation_insert(
111//!     "follows-1",
112//!     vec!["FOLLOWS"],
113//!     rel_props,
114//!     "user-1",  // start node
115//!     "user-2",  // end node
116//! ).await?;
117//!
118//! // Update a node
119//! let updated_props = PropertyMapBuilder::new()
120//!     .integer("age", 31)
121//!     .build();
122//!
123//! handle.send_node_update("user-1", vec!["User"], updated_props).await?;
124//!
125//! // Delete a node
126//! handle.send_delete("user-1", vec!["User"]).await?;
127//! ```
128//!
129//! # Use Cases
130//!
131//! - **Testing**: Inject test data directly without setting up external sources
132//! - **Integration**: Bridge between your application logic and Drasi queries
133//! - **Simulation**: Generate synthetic events for development and demos
134//! - **Hybrid Sources**: Combine with other sources for complex data pipelines
135
136pub mod config;
137pub use config::ApplicationSourceConfig;
138
139mod property_builder;
140mod time;
141
142#[cfg(test)]
143mod tests;
144
145pub use property_builder::PropertyMapBuilder;
146
147use anyhow::Result;
148use async_trait::async_trait;
149use log::{debug, info, warn};
150use std::collections::HashMap;
151use std::sync::Arc;
152use tokio::sync::{mpsc, RwLock};
153
154use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
155use drasi_lib::channels::{ComponentStatus, *};
156use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
157use drasi_lib::Source;
158use tracing::Instrument;
159
160/// Handle for programmatic event injection into an Application Source
161///
162/// `ApplicationSourceHandle` provides a type-safe API for injecting graph data changes
163/// (node inserts, updates, deletes, and relationship inserts) directly from your application
164/// code into the Drasi continuous query processing pipeline.
165#[derive(Clone)]
166pub struct ApplicationSourceHandle {
167    tx: mpsc::Sender<SourceChange>,
168    source_id: String,
169}
170
171impl ApplicationSourceHandle {
172    /// Send a raw source change event
173    pub async fn send(&self, change: SourceChange) -> Result<()> {
174        self.tx
175            .send(change)
176            .await
177            .map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
178        Ok(())
179    }
180
181    /// Insert a new node into the graph
182    pub async fn send_node_insert(
183        &self,
184        element_id: impl Into<Arc<str>>,
185        labels: Vec<impl Into<Arc<str>>>,
186        properties: drasi_core::models::ElementPropertyMap,
187    ) -> Result<()> {
188        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
189            warn!("Failed to get timestamp for node insert: {e}, using fallback");
190            chrono::Utc::now().timestamp_millis() as u64
191        });
192
193        let element = Element::Node {
194            metadata: ElementMetadata {
195                reference: ElementReference {
196                    source_id: Arc::from(self.source_id.as_str()),
197                    element_id: element_id.into(),
198                },
199                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
200                effective_from,
201            },
202            properties,
203        };
204
205        self.send(SourceChange::Insert { element }).await
206    }
207
208    /// Update an existing node in the graph
209    pub async fn send_node_update(
210        &self,
211        element_id: impl Into<Arc<str>>,
212        labels: Vec<impl Into<Arc<str>>>,
213        properties: drasi_core::models::ElementPropertyMap,
214    ) -> Result<()> {
215        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
216            warn!("Failed to get timestamp for node update: {e}, using fallback");
217            chrono::Utc::now().timestamp_millis() as u64
218        });
219
220        let element = Element::Node {
221            metadata: ElementMetadata {
222                reference: ElementReference {
223                    source_id: Arc::from(self.source_id.as_str()),
224                    element_id: element_id.into(),
225                },
226                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
227                effective_from,
228            },
229            properties,
230        };
231
232        self.send(SourceChange::Update { element }).await
233    }
234
235    /// Delete a node or relationship from the graph
236    pub async fn send_delete(
237        &self,
238        element_id: impl Into<Arc<str>>,
239        labels: Vec<impl Into<Arc<str>>>,
240    ) -> Result<()> {
241        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
242            warn!("Failed to get timestamp for delete: {e}, using fallback");
243            chrono::Utc::now().timestamp_millis() as u64
244        });
245
246        let metadata = ElementMetadata {
247            reference: ElementReference {
248                source_id: Arc::from(self.source_id.as_str()),
249                element_id: element_id.into(),
250            },
251            labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
252            effective_from,
253        };
254
255        self.send(SourceChange::Delete { metadata }).await
256    }
257
258    /// Insert a new relationship into the graph
259    pub async fn send_relation_insert(
260        &self,
261        element_id: impl Into<Arc<str>>,
262        labels: Vec<impl Into<Arc<str>>>,
263        properties: drasi_core::models::ElementPropertyMap,
264        start_node_id: impl Into<Arc<str>>,
265        end_node_id: impl Into<Arc<str>>,
266    ) -> Result<()> {
267        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
268            warn!("Failed to get timestamp for relation insert: {e}, using fallback");
269            chrono::Utc::now().timestamp_millis() as u64
270        });
271
272        let element = Element::Relation {
273            metadata: ElementMetadata {
274                reference: ElementReference {
275                    source_id: Arc::from(self.source_id.as_str()),
276                    element_id: element_id.into(),
277                },
278                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
279                effective_from,
280            },
281            properties,
282            in_node: ElementReference {
283                source_id: Arc::from(self.source_id.as_str()),
284                element_id: start_node_id.into(),
285            },
286            out_node: ElementReference {
287                source_id: Arc::from(self.source_id.as_str()),
288                element_id: end_node_id.into(),
289            },
290        };
291
292        self.send(SourceChange::Insert { element }).await
293    }
294
295    /// Send a batch of source changes efficiently
296    pub async fn send_batch(&self, changes: Vec<SourceChange>) -> Result<()> {
297        for change in changes {
298            self.send(change).await?;
299        }
300        Ok(())
301    }
302
303    /// Get the source ID that this handle is connected to
304    pub fn source_id(&self) -> &str {
305        &self.source_id
306    }
307}
308
309/// A source that allows applications to programmatically inject events.
310///
311/// This source receives events from an [`ApplicationSourceHandle`] and forwards
312/// them to the Drasi query processing pipeline.
313///
314/// # Fields
315///
316/// - `base`: Common source functionality (dispatchers, status, lifecycle, bootstrap)
317/// - `config`: Application source configuration
318/// - `app_rx`: Receiver for events from the handle
319/// - `app_tx`: Sender for creating additional handles
320pub struct ApplicationSource {
321    /// Base source implementation providing common functionality
322    base: SourceBase,
323    /// Application source configuration
324    config: ApplicationSourceConfig,
325    /// Receiver for events from handles (taken when processing starts)
326    app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
327    /// Sender for creating new handles
328    app_tx: mpsc::Sender<SourceChange>,
329}
330
331impl ApplicationSource {
332    /// Create a new application source and its handle.
333    ///
334    /// The event channel is automatically injected when the source is added
335    /// to DrasiLib via `add_source()`.
336    ///
337    /// # Arguments
338    ///
339    /// * `id` - Unique identifier for this source instance
340    /// * `config` - Application source configuration
341    ///
342    /// # Returns
343    ///
344    /// A tuple of `(ApplicationSource, ApplicationSourceHandle)` where the handle
345    /// can be used to send events to the source.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if the base source cannot be initialized.
350    ///
351    /// # Example
352    ///
353    /// ```rust,ignore
354    /// use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
355    ///
356    /// let config = ApplicationSourceConfig::default();
357    /// let (source, handle) = ApplicationSource::new("my-source", config)?;
358    /// ```
359    pub fn new(
360        id: impl Into<String>,
361        config: ApplicationSourceConfig,
362    ) -> Result<(Self, ApplicationSourceHandle)> {
363        let id = id.into();
364        let params = SourceBaseParams::new(id.clone());
365        let (app_tx, app_rx) = mpsc::channel(1000);
366
367        let handle = ApplicationSourceHandle {
368            tx: app_tx.clone(),
369            source_id: id.clone(),
370        };
371
372        let source = Self {
373            base: SourceBase::new(params)?,
374            config,
375            app_rx: Arc::new(RwLock::new(Some(app_rx))),
376            app_tx,
377        };
378
379        Ok((source, handle))
380    }
381
382    /// Get a new handle for this source
383    pub fn get_handle(&self) -> ApplicationSourceHandle {
384        ApplicationSourceHandle {
385            tx: self.app_tx.clone(),
386            source_id: self.base.id.clone(),
387        }
388    }
389
390    async fn process_events(&self) -> Result<()> {
391        let mut rx = self
392            .app_rx
393            .write()
394            .await
395            .take()
396            .ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
397
398        let source_name = self.base.id.clone();
399        let base_dispatchers = self.base.dispatchers.clone();
400        let reporter = self.base.status_handle();
401        let source_id = self.base.id.clone();
402
403        // Get instance_id from context for log routing isolation
404        let instance_id = self
405            .base
406            .context()
407            .await
408            .map(|c| c.instance_id)
409            .unwrap_or_default();
410
411        let span = tracing::info_span!(
412            "application_source_processor",
413            instance_id = %instance_id,
414            component_id = %source_id,
415            component_type = "source"
416        );
417        let handle = tokio::spawn(
418            async move {
419                info!("ApplicationSource '{source_name}' event processor started");
420
421                reporter
422                    .set_status(
423                        ComponentStatus::Running,
424                        Some("Processing events".to_string()),
425                    )
426                    .await;
427
428                while let Some(change) = rx.recv().await {
429                    debug!("ApplicationSource '{source_name}' received event: {change:?}");
430
431                    let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
432                    profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
433
434                    let wrapper = SourceEventWrapper::with_profiling(
435                        source_name.clone(),
436                        SourceEvent::Change(change),
437                        chrono::Utc::now(),
438                        profiling,
439                    );
440
441                    if let Err(e) = SourceBase::dispatch_from_task(
442                        base_dispatchers.clone(),
443                        wrapper,
444                        &source_name,
445                    )
446                    .await
447                    {
448                        debug!("Failed to dispatch change (no subscribers): {e}");
449                    }
450                }
451
452                info!("ApplicationSource '{source_name}' event processor stopped");
453            }
454            .instrument(span),
455        );
456
457        *self.base.task_handle.write().await = Some(handle);
458        Ok(())
459    }
460}
461
462#[async_trait]
463impl Source for ApplicationSource {
464    fn id(&self) -> &str {
465        &self.base.id
466    }
467
468    fn type_name(&self) -> &str {
469        "application"
470    }
471
472    fn properties(&self) -> HashMap<String, serde_json::Value> {
473        self.config.properties.clone()
474    }
475
476    fn auto_start(&self) -> bool {
477        self.base.get_auto_start()
478    }
479
480    async fn start(&self) -> Result<()> {
481        info!("Starting ApplicationSource '{}'", self.base.id);
482
483        self.base
484            .set_status(
485                ComponentStatus::Starting,
486                Some("Starting application source".to_string()),
487            )
488            .await;
489
490        self.process_events().await?;
491
492        Ok(())
493    }
494
495    async fn stop(&self) -> Result<()> {
496        info!("Stopping ApplicationSource '{}'", self.base.id);
497
498        self.base
499            .set_status(
500                ComponentStatus::Stopping,
501                Some("Stopping application source".to_string()),
502            )
503            .await;
504
505        if let Some(handle) = self.base.task_handle.write().await.take() {
506            handle.abort();
507        }
508
509        self.base
510            .set_status(
511                ComponentStatus::Stopped,
512                Some("Application source stopped".to_string()),
513            )
514            .await;
515
516        Ok(())
517    }
518
519    async fn status(&self) -> ComponentStatus {
520        self.base.get_status().await
521    }
522
523    async fn subscribe(
524        &self,
525        settings: drasi_lib::config::SourceSubscriptionSettings,
526    ) -> Result<SubscriptionResponse> {
527        self.base
528            .subscribe_with_bootstrap(&settings, "Application")
529            .await
530    }
531
532    fn as_any(&self) -> &dyn std::any::Any {
533        self
534    }
535
536    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
537        self.base.initialize(context).await;
538    }
539
540    async fn set_bootstrap_provider(
541        &self,
542        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
543    ) {
544        self.base.set_bootstrap_provider(provider).await;
545    }
546}