drasi_source_application/lib.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Application Source Plugin for Drasi
16//!
17//! This plugin enables programmatic event injection into Drasi's continuous query
18//! processing pipeline. Unlike other sources that connect to external data systems,
19//! the application source allows your Rust code to directly send graph data changes.
20//!
21//! # Architecture
22//!
23//! The application source uses a handle-based pattern:
24//! - **`ApplicationSource`**: The source component that processes events
25//! - **`ApplicationSourceHandle`**: A clonable handle for sending events from anywhere in your code
26//!
27//! # API Overview
28//!
29//! The `ApplicationSourceHandle` provides high-level methods for common operations:
30//!
31//! - [`send_node_insert`](ApplicationSourceHandle::send_node_insert) - Insert a new node
32//! - [`send_node_update`](ApplicationSourceHandle::send_node_update) - Update an existing node
33//! - [`send_delete`](ApplicationSourceHandle::send_delete) - Delete a node or relation
34//! - [`send_relation_insert`](ApplicationSourceHandle::send_relation_insert) - Insert a relationship
35//! - [`send_batch`](ApplicationSourceHandle::send_batch) - Send multiple changes efficiently
36//! - [`send`](ApplicationSourceHandle::send) - Send a raw `SourceChange` event
37//!
38//! # Building Properties
39//!
40//! Use the [`PropertyMapBuilder`] to construct property maps fluently:
41//!
42//! ```rust,ignore
43//! use drasi_source_application::PropertyMapBuilder;
44//!
45//! let props = PropertyMapBuilder::new()
46//! .string("name", "Alice")
47//! .integer("age", 30)
48//! .float("score", 95.5)
49//! .bool("active", true)
50//! .build();
51//! ```
52//!
53//! # Configuration
54//!
55//! The application source has minimal configuration since it receives events programmatically
56//! rather than connecting to an external system.
57//!
58//! | Field | Type | Default | Description |
59//! |-------|------|---------|-------------|
60//! | `properties` | object | `{}` | Custom properties (passed through to `properties()`) |
61//!
62//! # Bootstrap Support
63//!
64//! The application source supports pluggable bootstrap providers via the `BootstrapProvider`
65//! trait. Configure a bootstrap provider using `set_bootstrap_provider()` or through the
66//! builder pattern. Common options include `ApplicationBootstrapProvider` for replaying
67//! stored events, or any other `BootstrapProvider` implementation.
68//!
69//! # Example Configuration (YAML)
70//!
71//! ```yaml
72//! source_type: application
73//! properties: {}
74//! ```
75//!
76//! # Usage Example
77//!
78//! ```rust,ignore
79//! use drasi_source_application::{
80//! ApplicationSource, ApplicationSourceConfig, ApplicationSourceHandle,
81//! PropertyMapBuilder
82//! };
83//! use std::sync::Arc;
84//!
85//! // Create the source and handle
86//! let config = ApplicationSourceConfig::default();
87//! let (source, handle) = ApplicationSource::new("my-app-source", config)?;
88//!
89//! // Add source to Drasi
90//! let source = Arc::new(source);
91//! drasi.add_source(source).await?;
92//!
93//! // Clone handle for use in different parts of your application
94//! let handle_clone = handle.clone();
95//!
96//! // Insert a node
97//! let props = PropertyMapBuilder::new()
98//! .string("name", "Alice")
99//! .integer("age", 30)
100//! .build();
101//!
102//! handle.send_node_insert("user-1", vec!["User"], props).await?;
103//!
104//! // Insert a relationship
105//! let rel_props = PropertyMapBuilder::new()
106//! .string("since", "2024-01-01")
107//! .build();
108//!
109//! handle.send_relation_insert(
110//! "follows-1",
111//! vec!["FOLLOWS"],
112//! rel_props,
113//! "user-1", // start node
114//! "user-2", // end node
115//! ).await?;
116//!
117//! // Update a node
118//! let updated_props = PropertyMapBuilder::new()
119//! .integer("age", 31)
120//! .build();
121//!
122//! handle.send_node_update("user-1", vec!["User"], updated_props).await?;
123//!
124//! // Delete a node
125//! handle.send_delete("user-1", vec!["User"]).await?;
126//! ```
127//!
128//! # Use Cases
129//!
130//! - **Testing**: Inject test data directly without setting up external sources
131//! - **Integration**: Bridge between your application logic and Drasi queries
132//! - **Simulation**: Generate synthetic events for development and demos
133//! - **Hybrid Sources**: Combine with other sources for complex data pipelines
134
135pub mod config;
136pub use config::ApplicationSourceConfig;
137
138mod property_builder;
139mod time;
140
141#[cfg(test)]
142mod tests;
143
144pub use property_builder::PropertyMapBuilder;
145
146use anyhow::Result;
147use async_trait::async_trait;
148use log::{debug, info, warn};
149use std::collections::HashMap;
150use std::sync::Arc;
151use tokio::sync::{mpsc, RwLock};
152
153use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
154use drasi_lib::channels::{ComponentEventSender, ComponentStatus, ComponentType, *};
155use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
156use drasi_lib::Source;
157use tracing::Instrument;
158
159/// Handle for programmatic event injection into an Application Source
160///
161/// `ApplicationSourceHandle` provides a type-safe API for injecting graph data changes
162/// (node inserts, updates, deletes, and relationship inserts) directly from your application
163/// code into the Drasi continuous query processing pipeline.
164#[derive(Clone)]
165pub struct ApplicationSourceHandle {
166 tx: mpsc::Sender<SourceChange>,
167 source_id: String,
168}
169
170impl ApplicationSourceHandle {
171 /// Send a raw source change event
172 pub async fn send(&self, change: SourceChange) -> Result<()> {
173 self.tx
174 .send(change)
175 .await
176 .map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
177 Ok(())
178 }
179
180 /// Insert a new node into the graph
181 pub async fn send_node_insert(
182 &self,
183 element_id: impl Into<Arc<str>>,
184 labels: Vec<impl Into<Arc<str>>>,
185 properties: drasi_core::models::ElementPropertyMap,
186 ) -> Result<()> {
187 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
188 warn!("Failed to get timestamp for node insert: {e}, using fallback");
189 chrono::Utc::now().timestamp_millis() as u64
190 });
191
192 let element = Element::Node {
193 metadata: ElementMetadata {
194 reference: ElementReference {
195 source_id: Arc::from(self.source_id.as_str()),
196 element_id: element_id.into(),
197 },
198 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
199 effective_from,
200 },
201 properties,
202 };
203
204 self.send(SourceChange::Insert { element }).await
205 }
206
207 /// Update an existing node in the graph
208 pub async fn send_node_update(
209 &self,
210 element_id: impl Into<Arc<str>>,
211 labels: Vec<impl Into<Arc<str>>>,
212 properties: drasi_core::models::ElementPropertyMap,
213 ) -> Result<()> {
214 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
215 warn!("Failed to get timestamp for node update: {e}, using fallback");
216 chrono::Utc::now().timestamp_millis() as u64
217 });
218
219 let element = Element::Node {
220 metadata: ElementMetadata {
221 reference: ElementReference {
222 source_id: Arc::from(self.source_id.as_str()),
223 element_id: element_id.into(),
224 },
225 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
226 effective_from,
227 },
228 properties,
229 };
230
231 self.send(SourceChange::Update { element }).await
232 }
233
234 /// Delete a node or relationship from the graph
235 pub async fn send_delete(
236 &self,
237 element_id: impl Into<Arc<str>>,
238 labels: Vec<impl Into<Arc<str>>>,
239 ) -> Result<()> {
240 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
241 warn!("Failed to get timestamp for delete: {e}, using fallback");
242 chrono::Utc::now().timestamp_millis() as u64
243 });
244
245 let metadata = ElementMetadata {
246 reference: ElementReference {
247 source_id: Arc::from(self.source_id.as_str()),
248 element_id: element_id.into(),
249 },
250 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
251 effective_from,
252 };
253
254 self.send(SourceChange::Delete { metadata }).await
255 }
256
257 /// Insert a new relationship into the graph
258 pub async fn send_relation_insert(
259 &self,
260 element_id: impl Into<Arc<str>>,
261 labels: Vec<impl Into<Arc<str>>>,
262 properties: drasi_core::models::ElementPropertyMap,
263 start_node_id: impl Into<Arc<str>>,
264 end_node_id: impl Into<Arc<str>>,
265 ) -> Result<()> {
266 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
267 warn!("Failed to get timestamp for relation insert: {e}, using fallback");
268 chrono::Utc::now().timestamp_millis() as u64
269 });
270
271 let element = Element::Relation {
272 metadata: ElementMetadata {
273 reference: ElementReference {
274 source_id: Arc::from(self.source_id.as_str()),
275 element_id: element_id.into(),
276 },
277 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
278 effective_from,
279 },
280 properties,
281 in_node: ElementReference {
282 source_id: Arc::from(self.source_id.as_str()),
283 element_id: end_node_id.into(),
284 },
285 out_node: ElementReference {
286 source_id: Arc::from(self.source_id.as_str()),
287 element_id: start_node_id.into(),
288 },
289 };
290
291 self.send(SourceChange::Insert { element }).await
292 }
293
294 /// Send a batch of source changes efficiently
295 pub async fn send_batch(&self, changes: Vec<SourceChange>) -> Result<()> {
296 for change in changes {
297 self.send(change).await?;
298 }
299 Ok(())
300 }
301
302 /// Get the source ID that this handle is connected to
303 pub fn source_id(&self) -> &str {
304 &self.source_id
305 }
306}
307
308/// A source that allows applications to programmatically inject events.
309///
310/// This source receives events from an [`ApplicationSourceHandle`] and forwards
311/// them to the Drasi query processing pipeline.
312///
313/// # Fields
314///
315/// - `base`: Common source functionality (dispatchers, status, lifecycle, bootstrap)
316/// - `config`: Application source configuration
317/// - `app_rx`: Receiver for events from the handle
318/// - `app_tx`: Sender for creating additional handles
319pub struct ApplicationSource {
320 /// Base source implementation providing common functionality
321 base: SourceBase,
322 /// Application source configuration
323 config: ApplicationSourceConfig,
324 /// Receiver for events from handles (taken when processing starts)
325 app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
326 /// Sender for creating new handles
327 app_tx: mpsc::Sender<SourceChange>,
328}
329
330impl ApplicationSource {
331 /// Create a new application source and its handle.
332 ///
333 /// The event channel is automatically injected when the source is added
334 /// to DrasiLib via `add_source()`.
335 ///
336 /// # Arguments
337 ///
338 /// * `id` - Unique identifier for this source instance
339 /// * `config` - Application source configuration
340 ///
341 /// # Returns
342 ///
343 /// A tuple of `(ApplicationSource, ApplicationSourceHandle)` where the handle
344 /// can be used to send events to the source.
345 ///
346 /// # Errors
347 ///
348 /// Returns an error if the base source cannot be initialized.
349 ///
350 /// # Example
351 ///
352 /// ```rust,ignore
353 /// use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
354 ///
355 /// let config = ApplicationSourceConfig::default();
356 /// let (source, handle) = ApplicationSource::new("my-source", config)?;
357 /// ```
358 pub fn new(
359 id: impl Into<String>,
360 config: ApplicationSourceConfig,
361 ) -> Result<(Self, ApplicationSourceHandle)> {
362 let id = id.into();
363 let params = SourceBaseParams::new(id.clone());
364 let (app_tx, app_rx) = mpsc::channel(1000);
365
366 let handle = ApplicationSourceHandle {
367 tx: app_tx.clone(),
368 source_id: id.clone(),
369 };
370
371 let source = Self {
372 base: SourceBase::new(params)?,
373 config,
374 app_rx: Arc::new(RwLock::new(Some(app_rx))),
375 app_tx,
376 };
377
378 Ok((source, handle))
379 }
380
381 /// Get a new handle for this source
382 pub fn get_handle(&self) -> ApplicationSourceHandle {
383 ApplicationSourceHandle {
384 tx: self.app_tx.clone(),
385 source_id: self.base.id.clone(),
386 }
387 }
388
389 async fn process_events(&self) -> Result<()> {
390 let mut rx = self
391 .app_rx
392 .write()
393 .await
394 .take()
395 .ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
396
397 let source_name = self.base.id.clone();
398 let base_dispatchers = self.base.dispatchers.clone();
399 let status_tx = self.base.status_tx();
400 let status = self.base.status.clone();
401 let source_id = self.base.id.clone();
402
403 // Get instance_id from context for log routing isolation
404 let instance_id = self
405 .base
406 .context()
407 .await
408 .map(|c| c.instance_id)
409 .unwrap_or_default();
410
411 let span = tracing::info_span!(
412 "application_source_processor",
413 instance_id = %instance_id,
414 component_id = %source_id,
415 component_type = "source"
416 );
417 let handle = tokio::spawn(
418 async move {
419 info!("ApplicationSource '{source_name}' event processor started");
420
421 if let Some(ref tx) = *status_tx.read().await {
422 let _ = tx
423 .send(ComponentEvent {
424 component_id: source_name.clone(),
425 component_type: ComponentType::Source,
426 status: ComponentStatus::Running,
427 timestamp: chrono::Utc::now(),
428 message: Some("Processing events".to_string()),
429 })
430 .await;
431 }
432
433 *status.write().await = ComponentStatus::Running;
434
435 while let Some(change) = rx.recv().await {
436 debug!("ApplicationSource '{source_name}' received event: {change:?}");
437
438 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
439 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
440
441 let wrapper = SourceEventWrapper::with_profiling(
442 source_name.clone(),
443 SourceEvent::Change(change),
444 chrono::Utc::now(),
445 profiling,
446 );
447
448 if let Err(e) = SourceBase::dispatch_from_task(
449 base_dispatchers.clone(),
450 wrapper,
451 &source_name,
452 )
453 .await
454 {
455 debug!("Failed to dispatch change (no subscribers): {e}");
456 }
457 }
458
459 info!("ApplicationSource '{source_name}' event processor stopped");
460 }
461 .instrument(span),
462 );
463
464 *self.base.task_handle.write().await = Some(handle);
465 Ok(())
466 }
467}
468
469#[async_trait]
470impl Source for ApplicationSource {
471 fn id(&self) -> &str {
472 &self.base.id
473 }
474
475 fn type_name(&self) -> &str {
476 "application"
477 }
478
479 fn properties(&self) -> HashMap<String, serde_json::Value> {
480 self.config.properties.clone()
481 }
482
483 fn auto_start(&self) -> bool {
484 self.base.get_auto_start()
485 }
486
487 async fn start(&self) -> Result<()> {
488 info!("Starting ApplicationSource '{}'", self.base.id);
489
490 self.base
491 .set_status_with_event(
492 ComponentStatus::Starting,
493 Some("Starting application source".to_string()),
494 )
495 .await?;
496
497 self.process_events().await?;
498
499 Ok(())
500 }
501
502 async fn stop(&self) -> Result<()> {
503 info!("Stopping ApplicationSource '{}'", self.base.id);
504
505 self.base
506 .set_status_with_event(
507 ComponentStatus::Stopping,
508 Some("Stopping application source".to_string()),
509 )
510 .await?;
511
512 if let Some(handle) = self.base.task_handle.write().await.take() {
513 handle.abort();
514 }
515
516 self.base
517 .set_status_with_event(
518 ComponentStatus::Stopped,
519 Some("Application source stopped".to_string()),
520 )
521 .await?;
522
523 Ok(())
524 }
525
526 async fn status(&self) -> ComponentStatus {
527 self.base.status.read().await.clone()
528 }
529
530 async fn subscribe(
531 &self,
532 settings: drasi_lib::config::SourceSubscriptionSettings,
533 ) -> Result<SubscriptionResponse> {
534 self.base
535 .subscribe_with_bootstrap(&settings, "Application")
536 .await
537 }
538
539 fn as_any(&self) -> &dyn std::any::Any {
540 self
541 }
542
543 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
544 self.base.initialize(context).await;
545 }
546
547 async fn set_bootstrap_provider(
548 &self,
549 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
550 ) {
551 self.base.set_bootstrap_provider(provider).await;
552 }
553}