drasi_source_application/lib.rs
1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Application Source Plugin for Drasi
17//!
18//! This plugin enables programmatic event injection into Drasi's continuous query
19//! processing pipeline. Unlike other sources that connect to external data systems,
20//! the application source allows your Rust code to directly send graph data changes.
21//!
22//! # Architecture
23//!
24//! The application source uses a handle-based pattern:
25//! - **`ApplicationSource`**: The source component that processes events
26//! - **`ApplicationSourceHandle`**: A clonable handle for sending events from anywhere in your code
27//!
28//! # API Overview
29//!
30//! The `ApplicationSourceHandle` provides high-level methods for common operations:
31//!
32//! - [`send_node_insert`](ApplicationSourceHandle::send_node_insert) - Insert a new node
33//! - [`send_node_update`](ApplicationSourceHandle::send_node_update) - Update an existing node
34//! - [`send_delete`](ApplicationSourceHandle::send_delete) - Delete a node or relation
35//! - [`send_relation_insert`](ApplicationSourceHandle::send_relation_insert) - Insert a relationship
36//! - [`send_batch`](ApplicationSourceHandle::send_batch) - Send multiple changes efficiently
37//! - [`send`](ApplicationSourceHandle::send) - Send a raw `SourceChange` event
38//!
39//! # Building Properties
40//!
41//! Use the [`PropertyMapBuilder`] to construct property maps fluently:
42//!
43//! ```rust,ignore
44//! use drasi_source_application::PropertyMapBuilder;
45//!
46//! let props = PropertyMapBuilder::new()
47//! .string("name", "Alice")
48//! .integer("age", 30)
49//! .float("score", 95.5)
50//! .bool("active", true)
51//! .build();
52//! ```
53//!
54//! # Configuration
55//!
56//! The application source has minimal configuration since it receives events programmatically
57//! rather than connecting to an external system.
58//!
59//! | Field | Type | Default | Description |
60//! |-------|------|---------|-------------|
61//! | `properties` | object | `{}` | Custom properties (passed through to `properties()`) |
62//!
63//! # Bootstrap Support
64//!
65//! The application source supports pluggable bootstrap providers via the `BootstrapProvider`
66//! trait. Configure a bootstrap provider using `set_bootstrap_provider()` or through the
67//! builder pattern. Common options include `ApplicationBootstrapProvider` for replaying
68//! stored events, or any other `BootstrapProvider` implementation.
69//!
70//! # Example Configuration (YAML)
71//!
72//! ```yaml
73//! source_type: application
74//! properties: {}
75//! ```
76//!
77//! # Usage Example
78//!
79//! ```rust,ignore
80//! use drasi_source_application::{
81//! ApplicationSource, ApplicationSourceConfig, ApplicationSourceHandle,
82//! PropertyMapBuilder
83//! };
84//! use std::sync::Arc;
85//!
86//! // Create the source and handle
87//! let config = ApplicationSourceConfig::default();
88//! let (source, handle) = ApplicationSource::new("my-app-source", config)?;
89//!
90//! // Add source to Drasi
91//! let source = Arc::new(source);
92//! drasi.add_source(source).await?;
93//!
94//! // Clone handle for use in different parts of your application
95//! let handle_clone = handle.clone();
96//!
97//! // Insert a node
98//! let props = PropertyMapBuilder::new()
99//! .string("name", "Alice")
100//! .integer("age", 30)
101//! .build();
102//!
103//! handle.send_node_insert("user-1", vec!["User"], props).await?;
104//!
105//! // Insert a relationship
106//! let rel_props = PropertyMapBuilder::new()
107//! .string("since", "2024-01-01")
108//! .build();
109//!
110//! handle.send_relation_insert(
111//! "follows-1",
112//! vec!["FOLLOWS"],
113//! rel_props,
114//! "user-1", // start node
115//! "user-2", // end node
116//! ).await?;
117//!
118//! // Update a node
119//! let updated_props = PropertyMapBuilder::new()
120//! .integer("age", 31)
121//! .build();
122//!
123//! handle.send_node_update("user-1", vec!["User"], updated_props).await?;
124//!
125//! // Delete a node
126//! handle.send_delete("user-1", vec!["User"]).await?;
127//! ```
128//!
129//! # Use Cases
130//!
131//! - **Testing**: Inject test data directly without setting up external sources
132//! - **Integration**: Bridge between your application logic and Drasi queries
133//! - **Simulation**: Generate synthetic events for development and demos
134//! - **Hybrid Sources**: Combine with other sources for complex data pipelines
135
136pub mod config;
137pub use config::ApplicationSourceConfig;
138
139mod property_builder;
140mod time;
141
142#[cfg(test)]
143mod tests;
144
145pub use property_builder::PropertyMapBuilder;
146
147use anyhow::Result;
148use async_trait::async_trait;
149use log::{debug, info, warn};
150use std::collections::HashMap;
151use std::sync::Arc;
152use tokio::sync::{mpsc, RwLock};
153
154use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
155use drasi_lib::channels::{ComponentStatus, ComponentType, *};
156use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
157use drasi_lib::Source;
158use tracing::Instrument;
159
160/// Handle for programmatic event injection into an Application Source
161///
162/// `ApplicationSourceHandle` provides a type-safe API for injecting graph data changes
163/// (node inserts, updates, deletes, and relationship inserts) directly from your application
164/// code into the Drasi continuous query processing pipeline.
165#[derive(Clone)]
166pub struct ApplicationSourceHandle {
167 tx: mpsc::Sender<SourceChange>,
168 source_id: String,
169}
170
171impl ApplicationSourceHandle {
172 /// Send a raw source change event
173 pub async fn send(&self, change: SourceChange) -> Result<()> {
174 self.tx
175 .send(change)
176 .await
177 .map_err(|_| anyhow::anyhow!("Failed to send event: channel closed"))?;
178 Ok(())
179 }
180
181 /// Insert a new node into the graph
182 pub async fn send_node_insert(
183 &self,
184 element_id: impl Into<Arc<str>>,
185 labels: Vec<impl Into<Arc<str>>>,
186 properties: drasi_core::models::ElementPropertyMap,
187 ) -> Result<()> {
188 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
189 warn!("Failed to get timestamp for node insert: {e}, using fallback");
190 chrono::Utc::now().timestamp_millis() as u64
191 });
192
193 let element = Element::Node {
194 metadata: ElementMetadata {
195 reference: ElementReference {
196 source_id: Arc::from(self.source_id.as_str()),
197 element_id: element_id.into(),
198 },
199 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
200 effective_from,
201 },
202 properties,
203 };
204
205 self.send(SourceChange::Insert { element }).await
206 }
207
208 /// Update an existing node in the graph
209 pub async fn send_node_update(
210 &self,
211 element_id: impl Into<Arc<str>>,
212 labels: Vec<impl Into<Arc<str>>>,
213 properties: drasi_core::models::ElementPropertyMap,
214 ) -> Result<()> {
215 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
216 warn!("Failed to get timestamp for node update: {e}, using fallback");
217 chrono::Utc::now().timestamp_millis() as u64
218 });
219
220 let element = Element::Node {
221 metadata: ElementMetadata {
222 reference: ElementReference {
223 source_id: Arc::from(self.source_id.as_str()),
224 element_id: element_id.into(),
225 },
226 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
227 effective_from,
228 },
229 properties,
230 };
231
232 self.send(SourceChange::Update { element }).await
233 }
234
235 /// Delete a node or relationship from the graph
236 pub async fn send_delete(
237 &self,
238 element_id: impl Into<Arc<str>>,
239 labels: Vec<impl Into<Arc<str>>>,
240 ) -> Result<()> {
241 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
242 warn!("Failed to get timestamp for delete: {e}, using fallback");
243 chrono::Utc::now().timestamp_millis() as u64
244 });
245
246 let metadata = ElementMetadata {
247 reference: ElementReference {
248 source_id: Arc::from(self.source_id.as_str()),
249 element_id: element_id.into(),
250 },
251 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
252 effective_from,
253 };
254
255 self.send(SourceChange::Delete { metadata }).await
256 }
257
258 /// Insert a new relationship into the graph
259 pub async fn send_relation_insert(
260 &self,
261 element_id: impl Into<Arc<str>>,
262 labels: Vec<impl Into<Arc<str>>>,
263 properties: drasi_core::models::ElementPropertyMap,
264 start_node_id: impl Into<Arc<str>>,
265 end_node_id: impl Into<Arc<str>>,
266 ) -> Result<()> {
267 let effective_from = crate::time::get_current_timestamp_millis().unwrap_or_else(|e| {
268 warn!("Failed to get timestamp for relation insert: {e}, using fallback");
269 chrono::Utc::now().timestamp_millis() as u64
270 });
271
272 let element = Element::Relation {
273 metadata: ElementMetadata {
274 reference: ElementReference {
275 source_id: Arc::from(self.source_id.as_str()),
276 element_id: element_id.into(),
277 },
278 labels: Arc::from(labels.into_iter().map(|l| l.into()).collect::<Vec<_>>()),
279 effective_from,
280 },
281 properties,
282 in_node: ElementReference {
283 source_id: Arc::from(self.source_id.as_str()),
284 element_id: start_node_id.into(),
285 },
286 out_node: ElementReference {
287 source_id: Arc::from(self.source_id.as_str()),
288 element_id: end_node_id.into(),
289 },
290 };
291
292 self.send(SourceChange::Insert { element }).await
293 }
294
295 /// Send a batch of source changes efficiently
296 pub async fn send_batch(&self, changes: Vec<SourceChange>) -> Result<()> {
297 for change in changes {
298 self.send(change).await?;
299 }
300 Ok(())
301 }
302
303 /// Get the source ID that this handle is connected to
304 pub fn source_id(&self) -> &str {
305 &self.source_id
306 }
307}
308
309/// A source that allows applications to programmatically inject events.
310///
311/// This source receives events from an [`ApplicationSourceHandle`] and forwards
312/// them to the Drasi query processing pipeline.
313///
314/// # Fields
315///
316/// - `base`: Common source functionality (dispatchers, status, lifecycle, bootstrap)
317/// - `config`: Application source configuration
318/// - `app_rx`: Receiver for events from the handle
319/// - `app_tx`: Sender for creating additional handles
320pub struct ApplicationSource {
321 /// Base source implementation providing common functionality
322 base: SourceBase,
323 /// Application source configuration
324 config: ApplicationSourceConfig,
325 /// Receiver for events from handles (taken when processing starts)
326 app_rx: Arc<RwLock<Option<mpsc::Receiver<SourceChange>>>>,
327 /// Sender for creating new handles
328 app_tx: mpsc::Sender<SourceChange>,
329}
330
331impl ApplicationSource {
332 /// Create a new application source and its handle.
333 ///
334 /// The event channel is automatically injected when the source is added
335 /// to DrasiLib via `add_source()`.
336 ///
337 /// # Arguments
338 ///
339 /// * `id` - Unique identifier for this source instance
340 /// * `config` - Application source configuration
341 ///
342 /// # Returns
343 ///
344 /// A tuple of `(ApplicationSource, ApplicationSourceHandle)` where the handle
345 /// can be used to send events to the source.
346 ///
347 /// # Errors
348 ///
349 /// Returns an error if the base source cannot be initialized.
350 ///
351 /// # Example
352 ///
353 /// ```rust,ignore
354 /// use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
355 ///
356 /// let config = ApplicationSourceConfig::default();
357 /// let (source, handle) = ApplicationSource::new("my-source", config)?;
358 /// ```
359 pub fn new(
360 id: impl Into<String>,
361 config: ApplicationSourceConfig,
362 ) -> Result<(Self, ApplicationSourceHandle)> {
363 let id = id.into();
364 let params = SourceBaseParams::new(id.clone());
365 let (app_tx, app_rx) = mpsc::channel(1000);
366
367 let handle = ApplicationSourceHandle {
368 tx: app_tx.clone(),
369 source_id: id.clone(),
370 };
371
372 let source = Self {
373 base: SourceBase::new(params)?,
374 config,
375 app_rx: Arc::new(RwLock::new(Some(app_rx))),
376 app_tx,
377 };
378
379 Ok((source, handle))
380 }
381
382 /// Get a new handle for this source
383 pub fn get_handle(&self) -> ApplicationSourceHandle {
384 ApplicationSourceHandle {
385 tx: self.app_tx.clone(),
386 source_id: self.base.id.clone(),
387 }
388 }
389
390 async fn process_events(&self) -> Result<()> {
391 let mut rx = self
392 .app_rx
393 .write()
394 .await
395 .take()
396 .ok_or_else(|| anyhow::anyhow!("Receiver already taken"))?;
397
398 let source_name = self.base.id.clone();
399 let base_dispatchers = self.base.dispatchers.clone();
400 let status_tx = self.base.status_tx();
401 let status = self.base.status.clone();
402 let source_id = self.base.id.clone();
403
404 // Get instance_id from context for log routing isolation
405 let instance_id = self
406 .base
407 .context()
408 .await
409 .map(|c| c.instance_id)
410 .unwrap_or_default();
411
412 let span = tracing::info_span!(
413 "application_source_processor",
414 instance_id = %instance_id,
415 component_id = %source_id,
416 component_type = "source"
417 );
418 let handle = tokio::spawn(
419 async move {
420 info!("ApplicationSource '{source_name}' event processor started");
421
422 if let Some(ref tx) = *status_tx.read().await {
423 let _ = tx
424 .send(ComponentEvent {
425 component_id: source_name.clone(),
426 component_type: ComponentType::Source,
427 status: ComponentStatus::Running,
428 timestamp: chrono::Utc::now(),
429 message: Some("Processing events".to_string()),
430 })
431 .await;
432 }
433
434 *status.write().await = ComponentStatus::Running;
435
436 while let Some(change) = rx.recv().await {
437 debug!("ApplicationSource '{source_name}' received event: {change:?}");
438
439 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
440 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
441
442 let wrapper = SourceEventWrapper::with_profiling(
443 source_name.clone(),
444 SourceEvent::Change(change),
445 chrono::Utc::now(),
446 profiling,
447 );
448
449 if let Err(e) = SourceBase::dispatch_from_task(
450 base_dispatchers.clone(),
451 wrapper,
452 &source_name,
453 )
454 .await
455 {
456 debug!("Failed to dispatch change (no subscribers): {e}");
457 }
458 }
459
460 info!("ApplicationSource '{source_name}' event processor stopped");
461 }
462 .instrument(span),
463 );
464
465 *self.base.task_handle.write().await = Some(handle);
466 Ok(())
467 }
468}
469
470#[async_trait]
471impl Source for ApplicationSource {
472 fn id(&self) -> &str {
473 &self.base.id
474 }
475
476 fn type_name(&self) -> &str {
477 "application"
478 }
479
480 fn properties(&self) -> HashMap<String, serde_json::Value> {
481 self.config.properties.clone()
482 }
483
484 fn auto_start(&self) -> bool {
485 self.base.get_auto_start()
486 }
487
488 async fn start(&self) -> Result<()> {
489 info!("Starting ApplicationSource '{}'", self.base.id);
490
491 self.base
492 .set_status_with_event(
493 ComponentStatus::Starting,
494 Some("Starting application source".to_string()),
495 )
496 .await?;
497
498 self.process_events().await?;
499
500 Ok(())
501 }
502
503 async fn stop(&self) -> Result<()> {
504 info!("Stopping ApplicationSource '{}'", self.base.id);
505
506 self.base
507 .set_status_with_event(
508 ComponentStatus::Stopping,
509 Some("Stopping application source".to_string()),
510 )
511 .await?;
512
513 if let Some(handle) = self.base.task_handle.write().await.take() {
514 handle.abort();
515 }
516
517 self.base
518 .set_status_with_event(
519 ComponentStatus::Stopped,
520 Some("Application source stopped".to_string()),
521 )
522 .await?;
523
524 Ok(())
525 }
526
527 async fn status(&self) -> ComponentStatus {
528 self.base.status.read().await.clone()
529 }
530
531 async fn subscribe(
532 &self,
533 settings: drasi_lib::config::SourceSubscriptionSettings,
534 ) -> Result<SubscriptionResponse> {
535 self.base
536 .subscribe_with_bootstrap(&settings, "Application")
537 .await
538 }
539
540 fn as_any(&self) -> &dyn std::any::Any {
541 self
542 }
543
544 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
545 self.base.initialize(context).await;
546 }
547
548 async fn set_bootstrap_provider(
549 &self,
550 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
551 ) {
552 self.base.set_bootstrap_provider(provider).await;
553 }
554}