drasi_source_application/lib.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Application Source Plugin for Drasi
16//!
17//! This plugin enables programmatic event injection into Drasi's continuous query
18//! processing pipeline. Unlike other sources that connect to external data systems,
19//! the application source allows your Rust code to directly send graph data changes.
20//!
21//! # Architecture
22//!
23//! The application source uses a handle-based pattern:
24//! - **`ApplicationSource`**: The source component that processes events
25//! - **`ApplicationSourceHandle`**: A clonable handle for sending events from anywhere in your code
26//!
27//! # API Overview
28//!
29//! The `ApplicationSourceHandle` provides high-level methods for common operations:
30//!
31//! - [`send_node_insert`](ApplicationSourceHandle::send_node_insert) - Insert a new node
32//! - [`send_node_update`](ApplicationSourceHandle::send_node_update) - Update an existing node
33//! - [`send_delete`](ApplicationSourceHandle::send_delete) - Delete a node or relation
34//! - [`send_relation_insert`](ApplicationSourceHandle::send_relation_insert) - Insert a relationship
35//! - [`send_batch`](ApplicationSourceHandle::send_batch) - Send multiple changes efficiently
36//! - [`send`](ApplicationSourceHandle::send) - Send a raw `SourceChange` event
37//!
38//! # Building Properties
39//!
40//! Use the [`PropertyMapBuilder`] to construct property maps fluently:
41//!
42//! ```rust,ignore
43//! use drasi_source_application::PropertyMapBuilder;
44//!
45//! let props = PropertyMapBuilder::new()
46//! .string("name", "Alice")
47//! .integer("age", 30)
48//! .float("score", 95.5)
49//! .bool("active", true)
50//! .build();
51//! ```
52//!
53//! # Configuration
54//!
55//! The application source has minimal configuration since it receives events programmatically
56//! rather than connecting to an external system.
57//!
58//! | Field | Type | Default | Description |
59//! |-------|------|---------|-------------|
60//! | `properties` | object | `{}` | Custom properties (passed through to `properties()`) |
61//!
62//! # Bootstrap Support
63//!
64//! The application source supports pluggable bootstrap providers via the `BootstrapProvider`
65//! trait. Configure a bootstrap provider using `set_bootstrap_provider()` or through the
66//! builder pattern. Common options include `ApplicationBootstrapProvider` for replaying
67//! stored events, or any other `BootstrapProvider` implementation.
68//!
69//! # Example Configuration (YAML)
70//!
71//! ```yaml
72//! source_type: application
73//! properties: {}
74//! ```
75//!
76//! # Usage Example
77//!
78//! ```rust,ignore
79//! use drasi_source_application::{
80//! ApplicationSource, ApplicationSourceConfig, ApplicationSourceHandle,
81//! PropertyMapBuilder
82//! };
83//! use std::sync::Arc;
84//!
85//! // Create the source and handle
86//! let config = ApplicationSourceConfig::default();
87//! let (source, handle) = ApplicationSource::new("my-app-source", config)?;
88//!
89//! // Add source to Drasi
90//! let source = Arc::new(source);
91//! drasi.add_source(source).await?;
92//!
93//! // Clone handle for use in different parts of your application
94//! let handle_clone = handle.clone();
95//!
96//! // Insert a node
97//! let props = PropertyMapBuilder::new()
98//! .string("name", "Alice")
99//! .integer("age", 30)
100//! .build();
101//!
102//! handle.send_node_insert("user-1", vec!["User"], props).await?;
103//!
104//! // Insert a relationship
105//! let rel_props = PropertyMapBuilder::new()
106//! .string("since", "2024-01-01")
107//! .build();
108//!
109//! handle.send_relation_insert(
110//! "follows-1",
111//! vec!["FOLLOWS"],
112//! rel_props,
113//! "user-1", // start node
114//! "user-2", // end node
115//! ).await?;
116//!
117//! // Update a node
118//! let updated_props = PropertyMapBuilder::new()
119//! .integer("age", 31)
120//! .build();
121//!
122//! handle.send_node_update("user-1", vec!["User"], updated_props).await?;
123//!
124//! // Delete a node
125//! handle.send_delete("user-1", vec!["User"]).await?;
126//! ```
127//!
128//! # Use Cases
129//!
130//! - **Testing**: Inject test data directly without setting up external sources
131//! - **Integration**: Bridge between your application logic and Drasi queries
132//! - **Simulation**: Generate synthetic events for development and demos
133//! - **Hybrid Sources**: Combine with other sources for complex data pipelines
134
135pub mod config;
136pub use config::ApplicationSourceConfig;
137
138mod property_builder;
139mod time;
140
141#[cfg(test)]
142mod tests;
143
144pub use property_builder::PropertyMapBuilder;
145
146use anyhow::Result;
147use async_trait::async_trait;
148use log::{debug, info, warn};
149use std::collections::HashMap;
150use std::sync::Arc;
151use tokio::sync::{mpsc, RwLock};
152
153use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
154use drasi_lib::channels::{ComponentEventSender, ComponentStatus, ComponentType, *};
155use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
156use drasi_lib::Source;
157
158/// Handle for programmatic event injection into an Application Source
159///
160/// `ApplicationSourceHandle` provides a type-safe API for injecting graph data changes
161/// (node inserts, updates, deletes, and relationship inserts) directly from your application
162/// code into the Drasi continuous query processing pipeline.
163#[derive(Clone)]
164pub struct ApplicationSourceHandle {
165 tx: mpsc::Sender<SourceChange>,
166 source_id: String,
167}
168
169impl ApplicationSourceHandle {
170 /// Send a raw source change event
171 pub async fn send(&self, change: SourceChange) -> Result<()> {
172 self.tx
173 .send(change)
174 .await
175 .map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
176 Ok(())
177 }
178
179 /// Insert a new node into the graph
180 pub async fn send_node_insert(
181 &self,
182 element_id: impl Into<Arc<str>>,
183 labels: Vec<impl Into<Arc<str>>>,
184 properties: drasi_core::models::ElementPropertyMap,
185 ) -> Result<()> {
186 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
187 warn!("Failed to get timestamp for node insert: {e}, using fallback");
188 chrono::Utc::now().timestamp_millis() as u64
189 });
190
191 let element = Element::Node {
192 metadata: ElementMetadata {
193 reference: ElementReference {
194 source_id: Arc::from(self.source_id.as_str()),
195 element_id: element_id.into(),
196 },
197 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
198 effective_from,
199 },
200 properties,
201 };
202
203 self.send(SourceChange::Insert { element }).await
204 }
205
206 /// Update an existing node in the graph
207 pub async fn send_node_update(
208 &self,
209 element_id: impl Into<Arc<str>>,
210 labels: Vec<impl Into<Arc<str>>>,
211 properties: drasi_core::models::ElementPropertyMap,
212 ) -> Result<()> {
213 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
214 warn!("Failed to get timestamp for node update: {e}, using fallback");
215 chrono::Utc::now().timestamp_millis() as u64
216 });
217
218 let element = Element::Node {
219 metadata: ElementMetadata {
220 reference: ElementReference {
221 source_id: Arc::from(self.source_id.as_str()),
222 element_id: element_id.into(),
223 },
224 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
225 effective_from,
226 },
227 properties,
228 };
229
230 self.send(SourceChange::Update { element }).await
231 }
232
233 /// Delete a node or relationship from the graph
234 pub async fn send_delete(
235 &self,
236 element_id: impl Into<Arc<str>>,
237 labels: Vec<impl Into<Arc<str>>>,
238 ) -> Result<()> {
239 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
240 warn!("Failed to get timestamp for delete: {e}, using fallback");
241 chrono::Utc::now().timestamp_millis() as u64
242 });
243
244 let metadata = ElementMetadata {
245 reference: ElementReference {
246 source_id: Arc::from(self.source_id.as_str()),
247 element_id: element_id.into(),
248 },
249 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
250 effective_from,
251 };
252
253 self.send(SourceChange::Delete { metadata }).await
254 }
255
256 /// Insert a new relationship into the graph
257 pub async fn send_relation_insert(
258 &self,
259 element_id: impl Into<Arc<str>>,
260 labels: Vec<impl Into<Arc<str>>>,
261 properties: drasi_core::models::ElementPropertyMap,
262 start_node_id: impl Into<Arc<str>>,
263 end_node_id: impl Into<Arc<str>>,
264 ) -> Result<()> {
265 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
266 warn!("Failed to get timestamp for relation insert: {e}, using fallback");
267 chrono::Utc::now().timestamp_millis() as u64
268 });
269
270 let element = Element::Relation {
271 metadata: ElementMetadata {
272 reference: ElementReference {
273 source_id: Arc::from(self.source_id.as_str()),
274 element_id: element_id.into(),
275 },
276 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
277 effective_from,
278 },
279 properties,
280 in_node: ElementReference {
281 source_id: Arc::from(self.source_id.as_str()),
282 element_id: end_node_id.into(),
283 },
284 out_node: ElementReference {
285 source_id: Arc::from(self.source_id.as_str()),
286 element_id: start_node_id.into(),
287 },
288 };
289
290 self.send(SourceChange::Insert { element }).await
291 }
292
293 /// Send a batch of source changes efficiently
294 pub async fn send_batch(&self, changes: Vec<SourceChange>) -> Result<()> {
295 for change in changes {
296 self.send(change).await?;
297 }
298 Ok(())
299 }
300
301 /// Get the source ID that this handle is connected to
302 pub fn source_id(&self) -> &str {
303 &self.source_id
304 }
305}
306
307/// A source that allows applications to programmatically inject events.
308///
309/// This source receives events from an [`ApplicationSourceHandle`] and forwards
310/// them to the Drasi query processing pipeline.
311///
312/// # Fields
313///
314/// - `base`: Common source functionality (dispatchers, status, lifecycle, bootstrap)
315/// - `config`: Application source configuration
316/// - `app_rx`: Receiver for events from the handle
317/// - `app_tx`: Sender for creating additional handles
318pub struct ApplicationSource {
319 /// Base source implementation providing common functionality
320 base: SourceBase,
321 /// Application source configuration
322 config: ApplicationSourceConfig,
323 /// Receiver for events from handles (taken when processing starts)
324 app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
325 /// Sender for creating new handles
326 app_tx: mpsc::Sender<SourceChange>,
327}
328
329impl ApplicationSource {
330 /// Create a new application source and its handle.
331 ///
332 /// The event channel is automatically injected when the source is added
333 /// to DrasiLib via `add_source()`.
334 ///
335 /// # Arguments
336 ///
337 /// * `id` - Unique identifier for this source instance
338 /// * `config` - Application source configuration
339 ///
340 /// # Returns
341 ///
342 /// A tuple of `(ApplicationSource, ApplicationSourceHandle)` where the handle
343 /// can be used to send events to the source.
344 ///
345 /// # Errors
346 ///
347 /// Returns an error if the base source cannot be initialized.
348 ///
349 /// # Example
350 ///
351 /// ```rust,ignore
352 /// use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
353 ///
354 /// let config = ApplicationSourceConfig::default();
355 /// let (source, handle) = ApplicationSource::new("my-source", config)?;
356 /// ```
357 pub fn new(
358 id: impl Into<String>,
359 config: ApplicationSourceConfig,
360 ) -> Result<(Self, ApplicationSourceHandle)> {
361 let id = id.into();
362 let params = SourceBaseParams::new(id.clone());
363 let (app_tx, app_rx) = mpsc::channel(1000);
364
365 let handle = ApplicationSourceHandle {
366 tx: app_tx.clone(),
367 source_id: id.clone(),
368 };
369
370 let source = Self {
371 base: SourceBase::new(params)?,
372 config,
373 app_rx: Arc::new(RwLock::new(Some(app_rx))),
374 app_tx,
375 };
376
377 Ok((source, handle))
378 }
379
380 /// Get a new handle for this source
381 pub fn get_handle(&self) -> ApplicationSourceHandle {
382 ApplicationSourceHandle {
383 tx: self.app_tx.clone(),
384 source_id: self.base.id.clone(),
385 }
386 }
387
388 async fn process_events(&self) -> Result<()> {
389 let mut rx = self
390 .app_rx
391 .write()
392 .await
393 .take()
394 .ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
395
396 let source_name = self.base.id.clone();
397 let base_dispatchers = self.base.dispatchers.clone();
398 let status_tx = self.base.status_tx();
399 let status = self.base.status.clone();
400
401 let handle = tokio::spawn(async move {
402 info!("ApplicationSource '{source_name}' event processor started");
403
404 if let Some(ref tx) = *status_tx.read().await {
405 let _ = tx
406 .send(ComponentEvent {
407 component_id: source_name.clone(),
408 component_type: ComponentType::Source,
409 status: ComponentStatus::Running,
410 timestamp: chrono::Utc::now(),
411 message: Some("Processing events".to_string()),
412 })
413 .await;
414 }
415
416 *status.write().await = ComponentStatus::Running;
417
418 while let Some(change) = rx.recv().await {
419 debug!("ApplicationSource '{source_name}' received event: {change:?}");
420
421 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
422 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
423
424 let wrapper = SourceEventWrapper::with_profiling(
425 source_name.clone(),
426 SourceEvent::Change(change),
427 chrono::Utc::now(),
428 profiling,
429 );
430
431 if let Err(e) =
432 SourceBase::dispatch_from_task(base_dispatchers.clone(), wrapper, &source_name)
433 .await
434 {
435 debug!("Failed to dispatch change (no subscribers): {e}");
436 }
437 }
438
439 info!("ApplicationSource '{source_name}' event processor stopped");
440 });
441
442 *self.base.task_handle.write().await = Some(handle);
443 Ok(())
444 }
445}
446
447#[async_trait]
448impl Source for ApplicationSource {
449 fn id(&self) -> &str {
450 &self.base.id
451 }
452
453 fn type_name(&self) -> &str {
454 "application"
455 }
456
457 fn properties(&self) -> HashMap<String, serde_json::Value> {
458 self.config.properties.clone()
459 }
460
461 fn auto_start(&self) -> bool {
462 self.base.get_auto_start()
463 }
464
465 async fn start(&self) -> Result<()> {
466 info!("Starting ApplicationSource '{}'", self.base.id);
467
468 self.base
469 .set_status_with_event(
470 ComponentStatus::Starting,
471 Some("Starting application source".to_string()),
472 )
473 .await?;
474
475 self.process_events().await?;
476
477 Ok(())
478 }
479
480 async fn stop(&self) -> Result<()> {
481 info!("Stopping ApplicationSource '{}'", self.base.id);
482
483 self.base
484 .set_status_with_event(
485 ComponentStatus::Stopping,
486 Some("Stopping application source".to_string()),
487 )
488 .await?;
489
490 if let Some(handle) = self.base.task_handle.write().await.take() {
491 handle.abort();
492 }
493
494 self.base
495 .set_status_with_event(
496 ComponentStatus::Stopped,
497 Some("Application source stopped".to_string()),
498 )
499 .await?;
500
501 Ok(())
502 }
503
504 async fn status(&self) -> ComponentStatus {
505 self.base.status.read().await.clone()
506 }
507
508 async fn subscribe(
509 &self,
510 settings: drasi_lib::config::SourceSubscriptionSettings,
511 ) -> Result<SubscriptionResponse> {
512 self.base
513 .subscribe_with_bootstrap(&settings, "Application")
514 .await
515 }
516
517 fn as_any(&self) -> &dyn std::any::Any {
518 self
519 }
520
521 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
522 self.base.initialize(context).await;
523 }
524
525 async fn set_bootstrap_provider(
526 &self,
527 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
528 ) {
529 self.base.set_bootstrap_provider(provider).await;
530 }
531}