pub struct DrasiLib { /* private fields */ }Expand description
Main server type - use DrasiLib::builder() to create instances
§Examples
use drasi_lib::DrasiLib;
let core = DrasiLib::builder()
.with_id("my-server")
.build()
.await?;
core.start().await?;Core Drasi Server for continuous query processing
DrasiLib is the main entry point for embedding Drasi functionality in your application.
It manages sources (data ingestion), queries (continuous Cypher/GQL queries), and reactions
(output destinations) with a reactive event-driven architecture.
§Architecture
- Sources: Data ingestion points (PostgreSQL, HTTP, gRPC, Application, Mock, Platform)
- Queries: Continuous Cypher or GQL queries that process data changes in real-time
- Reactions: Output destinations that receive query results (HTTP, gRPC, Application, Log)
§Lifecycle States
The server progresses through these states:
- Created (via
builder()) - Initialized (one-time setup, automatic with builder)
- Running (after
start()) - Stopped (after
stop(), can be restarted)
Components (sources, queries, reactions) have independent lifecycle states:
Stopped: Component exists but is not processingStarting: Component is initializingRunning: Component is actively processingStopping: Component is shutting down
§Thread Safety
DrasiLib is Clone (all clones share the same underlying state) and all methods
are thread-safe. You can safely share clones across threads and call methods concurrently.
§Examples
§Builder Pattern
use drasi_lib::{DrasiLib, Query};
let core = DrasiLib::builder()
.with_id("my-server")
.with_source(my_source) // Pre-built source instance
.with_query(
Query::cypher("my-query")
.query("MATCH (n:Person) RETURN n.name, n.age")
.from_source("events")
.auto_start(true)
.build()
)
.with_reaction(my_reaction) // Pre-built reaction instance
.build()
.await?;
// Start all auto-start components
core.start().await?;
// List and inspect components
let sources = core.list_sources().await?;
let queries = core.list_queries().await?;
// Stop server
core.stop().await?;§Dynamic Runtime Configuration
use drasi_lib::{DrasiLib, Query};
let core = DrasiLib::builder()
.with_id("dynamic-server")
.build()
.await?;
core.start().await?;
// Add components at runtime
core.add_source(new_source_instance).await?;
core.add_query(
Query::cypher("new-query")
.query("MATCH (n) RETURN n")
.from_source("new-source")
.auto_start(true)
.build()
).await?;
// Start/stop individual components
core.stop_query("new-query").await?;
core.start_query("new-query").await?;
// Remove components
core.remove_query("new-query").await?;
core.remove_source("new-source").await?;§Restart Behavior
When you call stop() and then start() again, only components with auto_start=true
will be started. Components that were manually started (with auto_start=false) will
remain stopped:
core.start().await?;
// ... only auto_start=true components are running ...
core.stop().await?;
// ... all components stopped ...
core.start().await?;
// ... only auto_start=true components restarted ...Implementations§
Source§impl DrasiLib
impl DrasiLib
Sourcepub async fn start(&self) -> Result<()>
pub async fn start(&self) -> Result<()>
Start the server and all auto-start components
This starts all components (sources, queries, reactions) that have auto_start set to true,
as well as any components that were running when stop() was last called.
Components are started in dependency order: Sources → Queries → Reactions
§Errors
Returns an error if:
- The server is not initialized (
DrasiError::InvalidState) - The server is already running (
anyhow::Error) - Any component fails to start (propagated from component)
§Examples
let core = DrasiLib::builder()
.with_id("my-server")
.build()
.await?;
// Start server and all auto-start components
core.start().await?;
assert!(core.is_running().await);Sourcepub async fn stop(&self) -> Result<()>
pub async fn stop(&self) -> Result<()>
Stop the server and all running components
This stops all currently running components (sources, queries, reactions). Components are stopped in reverse dependency order: Reactions → Queries → Sources
On the next start(), only components with auto_start=true will be restarted.
§Errors
Returns an error if:
- The server is not running (
anyhow::Error) - Any component fails to stop (logged as error, but doesn’t prevent other components from stopping)
§Examples
// Stop server and all running components
core.stop().await?;
assert!(!core.is_running().await);
// Only auto_start=true components will be restarted
core.start().await?;Sourcepub fn query_manager(&self) -> &QueryManager
pub fn query_manager(&self) -> &QueryManager
Get direct access to the query manager (advanced usage)
This provides low-level access to the query manager for advanced scenarios.
Most users should use the higher-level methods like get_query_info(),
start_query(), etc. instead.
§Thread Safety
The returned reference is thread-safe and can be used across threads.
Sourcepub fn middleware_registry(&self) -> Arc<MiddlewareTypeRegistry>
pub fn middleware_registry(&self) -> Arc<MiddlewareTypeRegistry>
Get access to the middleware registry
Returns a reference to the middleware type registry that contains all registered middleware factories. The registry is pre-populated with all standard middleware types (jq, map, unwind, relabel, decoder, parse_json, promote).
§Thread Safety
The returned Arc can be cloned and used across threads.
§Examples
let core = DrasiLib::builder().build().await?;
let registry = core.middleware_registry();
// Use registry to create middleware instancesSourcepub async fn get_current_config(&self) -> Result<DrasiLibConfig>
pub async fn get_current_config(&self) -> Result<DrasiLibConfig>
Get a complete configuration snapshot of all components
Returns the full server configuration including all queries with their complete configurations.
Note: Sources and reactions are now instance-based and not stored in config.
Use list_sources() and list_reactions() for runtime information about these components.
§Example
let config = core.get_current_config().await?;
println!("Server has {} queries", config.queries.len());Sourcepub fn builder() -> DrasiLibBuilder
pub fn builder() -> DrasiLibBuilder
Create a builder for configuring a new DrasiLib instance.
The builder provides a fluent API for adding queries and source/reaction instances.
Note: Sources and reactions are now instance-based. Use with_source() and with_reaction()
to add pre-built instances.
§Example
use drasi_lib::{DrasiLib, Query};
let core = DrasiLib::builder()
.with_id("my-server")
.with_query(
Query::cypher("my-query")
.query("MATCH (n) RETURN n")
.from_source("events")
.build()
)
// Use .with_source(source_instance) and .with_reaction(reaction_instance)
// for pre-built source and reaction instances
.build()
.await?;
core.start().await?;Sourcepub async fn is_running(&self) -> bool
pub async fn is_running(&self) -> bool
Check if the server is currently running
Returns true if start() has been called and the server is actively processing,
false if the server is stopped or has not been started yet.
§Thread Safety
This method is thread-safe and can be called concurrently.
§Examples
let core = DrasiLib::builder().build().await?;
assert!(!core.is_running().await); // Not started yet
core.start().await?;
assert!(core.is_running().await); // Now running
core.stop().await?;
assert!(!core.is_running().await); // Stopped againSourcepub fn get_config(&self) -> &RuntimeConfig
pub fn get_config(&self) -> &RuntimeConfig
Get the runtime configuration
Returns a reference to the immutable runtime configuration containing server settings and all component configurations. This is the configuration that was provided during initialization (via builder, config file, or config string).
For a current snapshot of the configuration including runtime additions, use
get_current_config() instead.
§Thread Safety
This method is thread-safe and can be called concurrently.
§Examples
let core = DrasiLib::builder()
.with_id("my-server")
.build()
.await?;
let config = core.get_config();
println!("Server ID: {}", config.id);
println!("Number of queries: {}", config.queries.len());Source§impl DrasiLib
impl DrasiLib
Sourcepub async fn add_query(&self, query: QueryConfig) -> Result<()>
pub async fn add_query(&self, query: QueryConfig) -> Result<()>
Create a query in a running server
§Example
core.add_query(
Query::cypher("new-query")
.query("MATCH (n) RETURN n")
.from_source("source1")
.auto_start(true)
.build()
).await?;Sourcepub async fn remove_query(&self, id: &str) -> Result<()>
pub async fn remove_query(&self, id: &str) -> Result<()>
Remove a query from a running server
If the query is running, it will be stopped first before removal.
§Example
core.remove_query("old-query").await?;Sourcepub async fn start_query(&self, id: &str) -> Result<()>
pub async fn start_query(&self, id: &str) -> Result<()>
Start a stopped query
This will create the necessary subscriptions to source data streams.
§Example
core.start_query("my-query").await?;Sourcepub async fn stop_query(&self, id: &str) -> Result<()>
pub async fn stop_query(&self, id: &str) -> Result<()>
Sourcepub async fn list_queries(&self) -> Result<Vec<(String, ComponentStatus)>>
pub async fn list_queries(&self) -> Result<Vec<(String, ComponentStatus)>>
List all queries with their current status
§Example
let queries = core.list_queries().await?;
for (id, status) in queries {
println!("Query {}: {:?}", id, status);
}Sourcepub async fn get_query_info(&self, id: &str) -> Result<QueryRuntime>
pub async fn get_query_info(&self, id: &str) -> Result<QueryRuntime>
Get detailed information about a specific query
§Example
let query_info = core.get_query_info("my-query").await?;
println!("Query: {}", query_info.query);
println!("Status: {:?}", query_info.status);
println!("Source subscriptions: {:?}", query_info.source_subscriptions);Sourcepub async fn get_query_status(&self, id: &str) -> Result<ComponentStatus>
pub async fn get_query_status(&self, id: &str) -> Result<ComponentStatus>
Get the current status of a specific query
§Example
let status = core.get_query_status("my-query").await?;
println!("Query status: {:?}", status);Sourcepub async fn get_query_results(&self, id: &str) -> Result<Vec<Value>>
pub async fn get_query_results(&self, id: &str) -> Result<Vec<Value>>
Get the current result set for a running query
§Example
let results = core.get_query_results("my-query").await?;
println!("Current results: {} items", results.len());Sourcepub async fn get_query_config(&self, id: &str) -> Result<QueryConfig>
pub async fn get_query_config(&self, id: &str) -> Result<QueryConfig>
Get the full configuration for a specific query
This returns the complete query configuration including all fields like auto_start and joins,
unlike get_query_info() which only returns runtime information.
§Example
let config = core.get_query_config("my-query").await?;
println!("Auto-start: {}", config.auto_start);Source§impl DrasiLib
impl DrasiLib
Sourcepub async fn add_reaction(
&self,
reaction: impl Reaction + 'static,
) -> Result<()>
pub async fn add_reaction( &self, reaction: impl Reaction + 'static, ) -> Result<()>
Add a reaction instance to a running server, taking ownership.
The reaction instance is wrapped in an Arc internally - callers transfer ownership rather than pre-wrapping in Arc.
If the server is running and the reaction has auto_start=true, the reaction
will be started immediately after being added.
§Example
// Create the reaction and transfer ownership
// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
// core.add_reaction(reaction).await?; // Ownership transferred, auto-started if server runningSourcepub async fn remove_reaction(&self, id: &str) -> Result<()>
pub async fn remove_reaction(&self, id: &str) -> Result<()>
Remove a reaction from a running server
If the reaction is running, it will be stopped first before removal.
§Example
core.remove_reaction("old-reaction").await?;Sourcepub async fn start_reaction(&self, id: &str) -> Result<()>
pub async fn start_reaction(&self, id: &str) -> Result<()>
Start a stopped reaction
This will create the necessary subscriptions to query result streams.
The QueryProvider was already injected when the reaction was added via add_reaction().
§Example
core.start_reaction("my-reaction").await?;Sourcepub async fn stop_reaction(&self, id: &str) -> Result<()>
pub async fn stop_reaction(&self, id: &str) -> Result<()>
Sourcepub async fn list_reactions(&self) -> Result<Vec<(String, ComponentStatus)>>
pub async fn list_reactions(&self) -> Result<Vec<(String, ComponentStatus)>>
List all reactions with their current status
§Example
let reactions = core.list_reactions().await?;
for (id, status) in reactions {
println!("Reaction {}: {:?}", id, status);
}Sourcepub async fn get_reaction_info(&self, id: &str) -> Result<ReactionRuntime>
pub async fn get_reaction_info(&self, id: &str) -> Result<ReactionRuntime>
Get detailed information about a specific reaction
§Example
let reaction_info = core.get_reaction_info("my-reaction").await?;
println!("Reaction type: {}", reaction_info.reaction_type);
println!("Status: {:?}", reaction_info.status);
println!("Queries: {:?}", reaction_info.queries);Sourcepub async fn get_reaction_status(&self, id: &str) -> Result<ComponentStatus>
pub async fn get_reaction_status(&self, id: &str) -> Result<ComponentStatus>
Get the current status of a specific reaction
§Example
let status = core.get_reaction_status("my-reaction").await?;
println!("Reaction status: {:?}", status);Source§impl DrasiLib
impl DrasiLib
Sourcepub async fn add_source(&self, source: impl Source + 'static) -> Result<()>
pub async fn add_source(&self, source: impl Source + 'static) -> Result<()>
Add a source instance to a running server, taking ownership.
The source instance is wrapped in an Arc internally - callers transfer ownership rather than pre-wrapping in Arc.
If the server is running and the source has auto_start=true, the source
will be started immediately after being added.
§Example
// Create the source and transfer ownership
// let source = MySource::new("new-source", config)?;
// core.add_source(source).await?; // Ownership transferred, auto-started if server runningSourcepub async fn remove_source(&self, id: &str) -> Result<()>
pub async fn remove_source(&self, id: &str) -> Result<()>
Remove a source from a running server
If the source is running, it will be stopped first before removal.
§Example
core.remove_source("old-source").await?;Sourcepub async fn start_source(&self, id: &str) -> Result<()>
pub async fn start_source(&self, id: &str) -> Result<()>
Sourcepub async fn stop_source(&self, id: &str) -> Result<()>
pub async fn stop_source(&self, id: &str) -> Result<()>
Sourcepub async fn list_sources(&self) -> Result<Vec<(String, ComponentStatus)>>
pub async fn list_sources(&self) -> Result<Vec<(String, ComponentStatus)>>
List all sources with their current status
§Example
let sources = core.list_sources().await?;
for (id, status) in sources {
println!("Source {}: {:?}", id, status);
}Sourcepub async fn get_source_info(&self, id: &str) -> Result<SourceRuntime>
pub async fn get_source_info(&self, id: &str) -> Result<SourceRuntime>
Get detailed information about a specific source
§Example
let source_info = core.get_source_info("my-source").await?;
println!("Source type: {}", source_info.source_type);
println!("Status: {:?}", source_info.status);Sourcepub async fn get_source_status(&self, id: &str) -> Result<ComponentStatus>
pub async fn get_source_status(&self, id: &str) -> Result<ComponentStatus>
Get the current status of a specific source
§Example
let status = core.get_source_status("my-source").await?;
println!("Source status: {:?}", status);