Skip to main content

drasi_source_application/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Application Source Plugin for Drasi
16//!
17//! This plugin enables programmatic event injection into Drasi's continuous query
18//! processing pipeline. Unlike other sources that connect to external data systems,
19//! the application source allows your Rust code to directly send graph data changes.
20//!
21//! # Architecture
22//!
23//! The application source uses a handle-based pattern:
24//! - **`ApplicationSource`**: The source component that processes events
25//! - **`ApplicationSourceHandle`**: A clonable handle for sending events from anywhere in your code
26//!
27//! # API Overview
28//!
29//! The `ApplicationSourceHandle` provides high-level methods for common operations:
30//!
31//! - [`send_node_insert`](ApplicationSourceHandle::send_node_insert) - Insert a new node
32//! - [`send_node_update`](ApplicationSourceHandle::send_node_update) - Update an existing node
33//! - [`send_delete`](ApplicationSourceHandle::send_delete) - Delete a node or relation
34//! - [`send_relation_insert`](ApplicationSourceHandle::send_relation_insert) - Insert a relationship
35//! - [`send_batch`](ApplicationSourceHandle::send_batch) - Send multiple changes efficiently
36//! - [`send`](ApplicationSourceHandle::send) - Send a raw `SourceChange` event
37//!
38//! # Building Properties
39//!
40//! Use the [`PropertyMapBuilder`] to construct property maps fluently:
41//!
42//! ```rust,ignore
43//! use drasi_source_application::PropertyMapBuilder;
44//!
45//! let props = PropertyMapBuilder::new()
46//!     .string("name", "Alice")
47//!     .integer("age", 30)
48//!     .float("score", 95.5)
49//!     .bool("active", true)
50//!     .build();
51//! ```
52//!
53//! # Configuration
54//!
55//! The application source has minimal configuration since it receives events programmatically
56//! rather than connecting to an external system.
57//!
58//! | Field | Type | Default | Description |
59//! |-------|------|---------|-------------|
60//! | `properties` | object | `{}` | Custom properties (passed through to `properties()`) |
61//!
62//! # Bootstrap Support
63//!
64//! The application source supports pluggable bootstrap providers via the `BootstrapProvider`
65//! trait. Configure a bootstrap provider using `set_bootstrap_provider()` or through the
66//! builder pattern. Common options include `ApplicationBootstrapProvider` for replaying
67//! stored events, or any other `BootstrapProvider` implementation.
68//!
69//! # Example Configuration (YAML)
70//!
71//! ```yaml
72//! source_type: application
73//! properties: {}
74//! ```
75//!
76//! # Usage Example
77//!
78//! ```rust,ignore
79//! use drasi_source_application::{
80//!     ApplicationSource, ApplicationSourceConfig, ApplicationSourceHandle,
81//!     PropertyMapBuilder
82//! };
83//! use std::sync::Arc;
84//!
85//! // Create the source and handle
86//! let config = ApplicationSourceConfig::default();
87//! let (source, handle) = ApplicationSource::new("my-app-source", config)?;
88//!
89//! // Add source to Drasi
90//! let source = Arc::new(source);
91//! drasi.add_source(source).await?;
92//!
93//! // Clone handle for use in different parts of your application
94//! let handle_clone = handle.clone();
95//!
96//! // Insert a node
97//! let props = PropertyMapBuilder::new()
98//!     .string("name", "Alice")
99//!     .integer("age", 30)
100//!     .build();
101//!
102//! handle.send_node_insert("user-1", vec!["User"], props).await?;
103//!
104//! // Insert a relationship
105//! let rel_props = PropertyMapBuilder::new()
106//!     .string("since", "2024-01-01")
107//!     .build();
108//!
109//! handle.send_relation_insert(
110//!     "follows-1",
111//!     vec!["FOLLOWS"],
112//!     rel_props,
113//!     "user-1",  // start node
114//!     "user-2",  // end node
115//! ).await?;
116//!
117//! // Update a node
118//! let updated_props = PropertyMapBuilder::new()
119//!     .integer("age", 31)
120//!     .build();
121//!
122//! handle.send_node_update("user-1", vec!["User"], updated_props).await?;
123//!
124//! // Delete a node
125//! handle.send_delete("user-1", vec!["User"]).await?;
126//! ```
127//!
128//! # Use Cases
129//!
130//! - **Testing**: Inject test data directly without setting up external sources
131//! - **Integration**: Bridge between your application logic and Drasi queries
132//! - **Simulation**: Generate synthetic events for development and demos
133//! - **Hybrid Sources**: Combine with other sources for complex data pipelines
134
135pub mod config;
136pub use config::ApplicationSourceConfig;
137
138mod property_builder;
139mod time;
140
141#[cfg(test)]
142mod tests;
143
144pub use property_builder::PropertyMapBuilder;
145
146use anyhow::Result;
147use async_trait::async_trait;
148use log::{debug, info, warn};
149use std::collections::HashMap;
150use std::sync::Arc;
151use tokio::sync::{mpsc, RwLock};
152
153use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
154use drasi_lib::channels::{ComponentEventSender, ComponentStatus, ComponentType, *};
155use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
156use drasi_lib::Source;
157
158/// Handle for programmatic event injection into an Application Source
159///
160/// `ApplicationSourceHandle` provides a type-safe API for injecting graph data changes
161/// (node inserts, updates, deletes, and relationship inserts) directly from your application
162/// code into the Drasi continuous query processing pipeline.
163#[derive(Clone)]
164pub struct ApplicationSourceHandle {
165    tx: mpsc::Sender<SourceChange>,
166    source_id: String,
167}
168
169impl ApplicationSourceHandle {
170    /// Send a raw source change event
171    pub async fn send(&self, change: SourceChange) -> Result<()> {
172        self.tx
173            .send(change)
174            .await
175            .map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
176        Ok(())
177    }
178
179    /// Insert a new node into the graph
180    pub async fn send_node_insert(
181        &self,
182        element_id: impl Into<Arc<str>>,
183        labels: Vec<impl Into<Arc<str>>>,
184        properties: drasi_core::models::ElementPropertyMap,
185    ) -> Result<()> {
186        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
187            warn!("Failed to get timestamp for node insert: {e}, using fallback");
188            chrono::Utc::now().timestamp_millis() as u64
189        });
190
191        let element = Element::Node {
192            metadata: ElementMetadata {
193                reference: ElementReference {
194                    source_id: Arc::from(self.source_id.as_str()),
195                    element_id: element_id.into(),
196                },
197                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
198                effective_from,
199            },
200            properties,
201        };
202
203        self.send(SourceChange::Insert { element }).await
204    }
205
206    /// Update an existing node in the graph
207    pub async fn send_node_update(
208        &self,
209        element_id: impl Into<Arc<str>>,
210        labels: Vec<impl Into<Arc<str>>>,
211        properties: drasi_core::models::ElementPropertyMap,
212    ) -> Result<()> {
213        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
214            warn!("Failed to get timestamp for node update: {e}, using fallback");
215            chrono::Utc::now().timestamp_millis() as u64
216        });
217
218        let element = Element::Node {
219            metadata: ElementMetadata {
220                reference: ElementReference {
221                    source_id: Arc::from(self.source_id.as_str()),
222                    element_id: element_id.into(),
223                },
224                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
225                effective_from,
226            },
227            properties,
228        };
229
230        self.send(SourceChange::Update { element }).await
231    }
232
233    /// Delete a node or relationship from the graph
234    pub async fn send_delete(
235        &self,
236        element_id: impl Into<Arc<str>>,
237        labels: Vec<impl Into<Arc<str>>>,
238    ) -> Result<()> {
239        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
240            warn!("Failed to get timestamp for delete: {e}, using fallback");
241            chrono::Utc::now().timestamp_millis() as u64
242        });
243
244        let metadata = ElementMetadata {
245            reference: ElementReference {
246                source_id: Arc::from(self.source_id.as_str()),
247                element_id: element_id.into(),
248            },
249            labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
250            effective_from,
251        };
252
253        self.send(SourceChange::Delete { metadata }).await
254    }
255
256    /// Insert a new relationship into the graph
257    pub async fn send_relation_insert(
258        &self,
259        element_id: impl Into<Arc<str>>,
260        labels: Vec<impl Into<Arc<str>>>,
261        properties: drasi_core::models::ElementPropertyMap,
262        start_node_id: impl Into<Arc<str>>,
263        end_node_id: impl Into<Arc<str>>,
264    ) -> Result<()> {
265        let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
266            warn!("Failed to get timestamp for relation insert: {e}, using fallback");
267            chrono::Utc::now().timestamp_millis() as u64
268        });
269
270        let element = Element::Relation {
271            metadata: ElementMetadata {
272                reference: ElementReference {
273                    source_id: Arc::from(self.source_id.as_str()),
274                    element_id: element_id.into(),
275                },
276                labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
277                effective_from,
278            },
279            properties,
280            in_node: ElementReference {
281                source_id: Arc::from(self.source_id.as_str()),
282                element_id: end_node_id.into(),
283            },
284            out_node: ElementReference {
285                source_id: Arc::from(self.source_id.as_str()),
286                element_id: start_node_id.into(),
287            },
288        };
289
290        self.send(SourceChange::Insert { element }).await
291    }
292
293    /// Send a batch of source changes efficiently
294    pub async fn send_batch(&self, changes: Vec<SourceChange>) -> Result<()> {
295        for change in changes {
296            self.send(change).await?;
297        }
298        Ok(())
299    }
300
301    /// Get the source ID that this handle is connected to
302    pub fn source_id(&self) -> &str {
303        &self.source_id
304    }
305}
306
307/// A source that allows applications to programmatically inject events.
308///
309/// This source receives events from an [`ApplicationSourceHandle`] and forwards
310/// them to the Drasi query processing pipeline.
311///
312/// # Fields
313///
314/// - `base`: Common source functionality (dispatchers, status, lifecycle, bootstrap)
315/// - `config`: Application source configuration
316/// - `app_rx`: Receiver for events from the handle
317/// - `app_tx`: Sender for creating additional handles
318pub struct ApplicationSource {
319    /// Base source implementation providing common functionality
320    base: SourceBase,
321    /// Application source configuration
322    config: ApplicationSourceConfig,
323    /// Receiver for events from handles (taken when processing starts)
324    app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
325    /// Sender for creating new handles
326    app_tx: mpsc::Sender<SourceChange>,
327}
328
329impl ApplicationSource {
330    /// Create a new application source and its handle.
331    ///
332    /// The event channel is automatically injected when the source is added
333    /// to DrasiLib via `add_source()`.
334    ///
335    /// # Arguments
336    ///
337    /// * `id` - Unique identifier for this source instance
338    /// * `config` - Application source configuration
339    ///
340    /// # Returns
341    ///
342    /// A tuple of `(ApplicationSource, ApplicationSourceHandle)` where the handle
343    /// can be used to send events to the source.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if the base source cannot be initialized.
348    ///
349    /// # Example
350    ///
351    /// ```rust,ignore
352    /// use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
353    ///
354    /// let config = ApplicationSourceConfig::default();
355    /// let (source, handle) = ApplicationSource::new("my-source", config)?;
356    /// ```
357    pub fn new(
358        id: impl Into<String>,
359        config: ApplicationSourceConfig,
360    ) -> Result<(Self, ApplicationSourceHandle)> {
361        let id = id.into();
362        let params = SourceBaseParams::new(id.clone());
363        let (app_tx, app_rx) = mpsc::channel(1000);
364
365        let handle = ApplicationSourceHandle {
366            tx: app_tx.clone(),
367            source_id: id.clone(),
368        };
369
370        let source = Self {
371            base: SourceBase::new(params)?,
372            config,
373            app_rx: Arc::new(RwLock::new(Some(app_rx))),
374            app_tx,
375        };
376
377        Ok((source, handle))
378    }
379
380    /// Get a new handle for this source
381    pub fn get_handle(&self) -> ApplicationSourceHandle {
382        ApplicationSourceHandle {
383            tx: self.app_tx.clone(),
384            source_id: self.base.id.clone(),
385        }
386    }
387
388    async fn process_events(&self) -> Result<()> {
389        let mut rx = self
390            .app_rx
391            .write()
392            .await
393            .take()
394            .ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
395
396        let source_name = self.base.id.clone();
397        let base_dispatchers = self.base.dispatchers.clone();
398        let status_tx = self.base.status_tx();
399        let status = self.base.status.clone();
400
401        let handle = tokio::spawn(async move {
402            info!("ApplicationSource '{source_name}' event processor started");
403
404            if let Some(ref tx) = *status_tx.read().await {
405                let _ = tx
406                    .send(ComponentEvent {
407                        component_id: source_name.clone(),
408                        component_type: ComponentType::Source,
409                        status: ComponentStatus::Running,
410                        timestamp: chrono::Utc::now(),
411                        message: Some("Processing events".to_string()),
412                    })
413                    .await;
414            }
415
416            *status.write().await = ComponentStatus::Running;
417
418            while let Some(change) = rx.recv().await {
419                debug!("ApplicationSource '{source_name}' received event: {change:?}");
420
421                let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
422                profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
423
424                let wrapper = SourceEventWrapper::with_profiling(
425                    source_name.clone(),
426                    SourceEvent::Change(change),
427                    chrono::Utc::now(),
428                    profiling,
429                );
430
431                if let Err(e) =
432                    SourceBase::dispatch_from_task(base_dispatchers.clone(), wrapper, &source_name)
433                        .await
434                {
435                    debug!("Failed to dispatch change (no subscribers): {e}");
436                }
437            }
438
439            info!("ApplicationSource '{source_name}' event processor stopped");
440        });
441
442        *self.base.task_handle.write().await = Some(handle);
443        Ok(())
444    }
445}
446
447#[async_trait]
448impl Source for ApplicationSource {
449    fn id(&self) -> &str {
450        &self.base.id
451    }
452
453    fn type_name(&self) -> &str {
454        "application"
455    }
456
457    fn properties(&self) -> HashMap<String, serde_json::Value> {
458        self.config.properties.clone()
459    }
460
461    fn auto_start(&self) -> bool {
462        self.base.get_auto_start()
463    }
464
465    async fn start(&self) -> Result<()> {
466        info!("Starting ApplicationSource '{}'", self.base.id);
467
468        self.base
469            .set_status_with_event(
470                ComponentStatus::Starting,
471                Some("Starting application source".to_string()),
472            )
473            .await?;
474
475        self.process_events().await?;
476
477        Ok(())
478    }
479
480    async fn stop(&self) -> Result<()> {
481        info!("Stopping ApplicationSource '{}'", self.base.id);
482
483        self.base
484            .set_status_with_event(
485                ComponentStatus::Stopping,
486                Some("Stopping application source".to_string()),
487            )
488            .await?;
489
490        if let Some(handle) = self.base.task_handle.write().await.take() {
491            handle.abort();
492        }
493
494        self.base
495            .set_status_with_event(
496                ComponentStatus::Stopped,
497                Some("Application source stopped".to_string()),
498            )
499            .await?;
500
501        Ok(())
502    }
503
504    async fn status(&self) -> ComponentStatus {
505        self.base.status.read().await.clone()
506    }
507
508    async fn subscribe(
509        &self,
510        settings: drasi_lib::config::SourceSubscriptionSettings,
511    ) -> Result<SubscriptionResponse> {
512        self.base
513            .subscribe_with_bootstrap(&settings, "Application")
514            .await
515    }
516
517    fn as_any(&self) -> &dyn std::any::Any {
518        self
519    }
520
521    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
522        self.base.initialize(context).await;
523    }
524
525    async fn set_bootstrap_provider(
526        &self,
527        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
528    ) {
529        self.base.set_bootstrap_provider(provider).await;
530    }
531}