Skip to main content

DrasiLib

Struct DrasiLib 

Source
pub struct DrasiLib { /* private fields */ }
Expand description

Main server type - use DrasiLib::builder() to create instances

§Examples

use drasi_lib::DrasiLib;

let core = DrasiLib::builder()
    .with_id("my-server")
    .build()
    .await?;
core.start().await?;

Core Drasi Server for continuous query processing

DrasiLib is the main entry point for embedding Drasi functionality in your application. It manages sources (data ingestion), queries (continuous Cypher/GQL queries), and reactions (output destinations) with a reactive event-driven architecture.

§Architecture

  • Sources: Data ingestion points (PostgreSQL, HTTP, gRPC, Application, Mock, Platform)
  • Queries: Continuous Cypher or GQL queries that process data changes in real-time
  • Reactions: Output destinations that receive query results (HTTP, gRPC, Application, Log)

§Lifecycle States

The server progresses through these states:

  1. Created (via builder())
  2. Initialized (one-time setup, automatic with builder)
  3. Running (after start())
  4. Stopped (after stop(), can be restarted)

Components (sources, queries, reactions) have independent lifecycle states:

  • Stopped: Component exists but is not processing
  • Starting: Component is initializing
  • Running: Component is actively processing
  • Stopping: Component is shutting down

§Thread Safety

DrasiLib is Clone (all clones share the same underlying state) and all methods are thread-safe. You can safely share clones across threads and call methods concurrently.

§Examples

§Builder Pattern

use drasi_lib::{DrasiLib, Query};

let core = DrasiLib::builder()
    .with_id("my-server")
    .with_source(my_source)  // Pre-built source instance
    .with_query(
        Query::cypher("my-query")
            .query("MATCH (n:Person) RETURN n.name, n.age")
            .from_source("events")
            .auto_start(true)
            .build()
    )
    .with_reaction(my_reaction)  // Pre-built reaction instance
    .build()
    .await?;

// Start all auto-start components
core.start().await?;

// List and inspect components
let sources = core.list_sources().await?;
let queries = core.list_queries().await?;

// Stop server
core.stop().await?;

§Dynamic Runtime Configuration

use drasi_lib::{DrasiLib, Query};

let core = DrasiLib::builder()
    .with_id("dynamic-server")
    .build()
    .await?;

core.start().await?;

// Add components at runtime
core.add_source(new_source_instance).await?;

core.add_query(
    Query::cypher("new-query")
        .query("MATCH (n) RETURN n")
        .from_source("new-source")
        .auto_start(true)
        .build()
).await?;

// Start/stop individual components
core.stop_query("new-query").await?;
core.start_query("new-query").await?;

// Remove components
core.remove_query("new-query").await?;
core.remove_source("new-source").await?;

§Restart Behavior

When you call stop() and then start() again, only components with auto_start=true will be started. Components that were manually started (with auto_start=false) will remain stopped:

core.start().await?;
// ... only auto_start=true components are running ...

core.stop().await?;
// ... all components stopped ...

core.start().await?;
// ... only auto_start=true components restarted ...

Implementations§

Source§

impl DrasiLib

Source

pub async fn start(&self) -> Result<()>

Start the server and all auto-start components

This starts all components (sources, queries, reactions) that have auto_start set to true, as well as any components that were running when stop() was last called.

Components are started in dependency order: Sources → Queries → Reactions

§Errors

Returns an error if:

  • The server is not initialized (DrasiError::InvalidState)
  • The server is already running (anyhow::Error)
  • Any component fails to start (propagated from component)
§Examples
let core = DrasiLib::builder()
    .with_id("my-server")
    .build()
    .await?;

// Start server and all auto-start components
core.start().await?;

assert!(core.is_running().await);
Source

pub async fn stop(&self) -> Result<()>

Stop the server and all running components

This stops all currently running components (sources, queries, reactions). Components are stopped in reverse dependency order: Reactions → Queries → Sources

On the next start(), only components with auto_start=true will be restarted.

§Errors

Returns an error if:

  • The server is not running (anyhow::Error)
  • Any component fails to stop (logged as error, but doesn’t prevent other components from stopping)
§Examples
// Stop server and all running components
core.stop().await?;

assert!(!core.is_running().await);

// Only auto_start=true components will be restarted
core.start().await?;
Source

pub fn query_manager(&self) -> &QueryManager

Get direct access to the query manager (advanced usage)

This provides low-level access to the query manager for advanced scenarios. Most users should use the higher-level methods like get_query_info(), start_query(), etc. instead.

§Thread Safety

The returned reference is thread-safe and can be used across threads.

Source

pub fn middleware_registry(&self) -> Arc<MiddlewareTypeRegistry>

Get access to the middleware registry

Returns a reference to the middleware type registry that contains all registered middleware factories. The registry is pre-populated with all standard middleware types (jq, map, unwind, relabel, decoder, parse_json, promote).

§Thread Safety

The returned Arc can be cloned and used across threads.

§Examples
let core = DrasiLib::builder().build().await?;
let registry = core.middleware_registry();
// Use registry to create middleware instances
Source

pub async fn get_current_config(&self) -> Result<DrasiLibConfig>

Get a complete configuration snapshot of all components

Returns the full server configuration including all queries with their complete configurations. Note: Sources and reactions are now instance-based and not stored in config. Use list_sources() and list_reactions() for runtime information about these components.

§Example
let config = core.get_current_config().await?;
println!("Server has {} queries", config.queries.len());
Source

pub fn builder() -> DrasiLibBuilder

Create a builder for configuring a new DrasiLib instance.

The builder provides a fluent API for adding queries and source/reaction instances. Note: Sources and reactions are now instance-based. Use with_source() and with_reaction() to add pre-built instances.

§Example
use drasi_lib::{DrasiLib, Query};

let core = DrasiLib::builder()
    .with_id("my-server")
    .with_query(
        Query::cypher("my-query")
            .query("MATCH (n) RETURN n")
            .from_source("events")
            .build()
    )
    // Use .with_source(source_instance) and .with_reaction(reaction_instance)
    // for pre-built source and reaction instances
    .build()
    .await?;

core.start().await?;
Source

pub async fn is_running(&self) -> bool

Check if the server is currently running

Returns true if start() has been called and the server is actively processing, false if the server is stopped or has not been started yet.

§Thread Safety

This method is thread-safe and can be called concurrently.

§Examples
let core = DrasiLib::builder().build().await?;

assert!(!core.is_running().await); // Not started yet

core.start().await?;
assert!(core.is_running().await); // Now running

core.stop().await?;
assert!(!core.is_running().await); // Stopped again
Source

pub fn get_config(&self) -> &RuntimeConfig

Get the runtime configuration

Returns a reference to the immutable runtime configuration containing server settings and all component configurations. This is the configuration that was provided during initialization (via builder, config file, or config string).

For a current snapshot of the configuration including runtime additions, use get_current_config() instead.

§Thread Safety

This method is thread-safe and can be called concurrently.

§Examples
let core = DrasiLib::builder()
    .with_id("my-server")
    .build()
    .await?;

let config = core.get_config();
println!("Server ID: {}", config.id);
println!("Number of queries: {}", config.queries.len());
Source§

impl DrasiLib

Source

pub async fn add_query(&self, query: QueryConfig) -> Result<()>

Create a query in a running server

§Example
core.add_query(
    Query::cypher("new-query")
        .query("MATCH (n) RETURN n")
        .from_source("source1")
        .auto_start(true)
        .build()
).await?;
Source

pub async fn remove_query(&self, id: &str) -> Result<()>

Remove a query from a running server

If the query is running, it will be stopped first before removal.

§Example
core.remove_query("old-query").await?;
Source

pub async fn start_query(&self, id: &str) -> Result<()>

Start a stopped query

This will create the necessary subscriptions to source data streams.

§Example
core.start_query("my-query").await?;
Source

pub async fn stop_query(&self, id: &str) -> Result<()>

Stop a running query

§Example
core.stop_query("my-query").await?;
Source

pub async fn list_queries(&self) -> Result<Vec<(String, ComponentStatus)>>

List all queries with their current status

§Example
let queries = core.list_queries().await?;
for (id, status) in queries {
    println!("Query {}: {:?}", id, status);
}
Source

pub async fn get_query_info(&self, id: &str) -> Result<QueryRuntime>

Get detailed information about a specific query

§Example
let query_info = core.get_query_info("my-query").await?;
println!("Query: {}", query_info.query);
println!("Status: {:?}", query_info.status);
println!("Source subscriptions: {:?}", query_info.source_subscriptions);
Source

pub async fn get_query_status(&self, id: &str) -> Result<ComponentStatus>

Get the current status of a specific query

§Example
let status = core.get_query_status("my-query").await?;
println!("Query status: {:?}", status);
Source

pub async fn get_query_results(&self, id: &str) -> Result<Vec<Value>>

Get the current result set for a running query

§Example
let results = core.get_query_results("my-query").await?;
println!("Current results: {} items", results.len());
Source

pub async fn get_query_config(&self, id: &str) -> Result<QueryConfig>

Get the full configuration for a specific query

This returns the complete query configuration including all fields like auto_start and joins, unlike get_query_info() which only returns runtime information.

§Example
let config = core.get_query_config("my-query").await?;
println!("Auto-start: {}", config.auto_start);
Source§

impl DrasiLib

Source

pub async fn add_reaction( &self, reaction: impl Reaction + 'static, ) -> Result<()>

Add a reaction instance to a running server, taking ownership.

The reaction instance is wrapped in an Arc internally - callers transfer ownership rather than pre-wrapping in Arc.

If the server is running and the reaction has auto_start=true, the reaction will be started immediately after being added.

§Example
// Create the reaction and transfer ownership
// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
// core.add_reaction(reaction).await?;  // Ownership transferred, auto-started if server running
Source

pub async fn remove_reaction(&self, id: &str) -> Result<()>

Remove a reaction from a running server

If the reaction is running, it will be stopped first before removal.

§Example
core.remove_reaction("old-reaction").await?;
Source

pub async fn start_reaction(&self, id: &str) -> Result<()>

Start a stopped reaction

This will create the necessary subscriptions to query result streams. The QueryProvider was already injected when the reaction was added via add_reaction().

§Example
core.start_reaction("my-reaction").await?;
Source

pub async fn stop_reaction(&self, id: &str) -> Result<()>

Stop a running reaction

§Example
core.stop_reaction("my-reaction").await?;
Source

pub async fn list_reactions(&self) -> Result<Vec<(String, ComponentStatus)>>

List all reactions with their current status

§Example
let reactions = core.list_reactions().await?;
for (id, status) in reactions {
    println!("Reaction {}: {:?}", id, status);
}
Source

pub async fn get_reaction_info(&self, id: &str) -> Result<ReactionRuntime>

Get detailed information about a specific reaction

§Example
let reaction_info = core.get_reaction_info("my-reaction").await?;
println!("Reaction type: {}", reaction_info.reaction_type);
println!("Status: {:?}", reaction_info.status);
println!("Queries: {:?}", reaction_info.queries);
Source

pub async fn get_reaction_status(&self, id: &str) -> Result<ComponentStatus>

Get the current status of a specific reaction

§Example
let status = core.get_reaction_status("my-reaction").await?;
println!("Reaction status: {:?}", status);
Source§

impl DrasiLib

Source

pub async fn add_source(&self, source: impl Source + 'static) -> Result<()>

Add a source instance to a running server, taking ownership.

The source instance is wrapped in an Arc internally - callers transfer ownership rather than pre-wrapping in Arc.

If the server is running and the source has auto_start=true, the source will be started immediately after being added.

§Example
// Create the source and transfer ownership
// let source = MySource::new("new-source", config)?;
// core.add_source(source).await?;  // Ownership transferred, auto-started if server running
Source

pub async fn remove_source(&self, id: &str) -> Result<()>

Remove a source from a running server

If the source is running, it will be stopped first before removal.

§Example
core.remove_source("old-source").await?;
Source

pub async fn start_source(&self, id: &str) -> Result<()>

Start a stopped source

§Example
core.start_source("my-source").await?;
Source

pub async fn stop_source(&self, id: &str) -> Result<()>

Stop a running source

§Example
core.stop_source("my-source").await?;
Source

pub async fn list_sources(&self) -> Result<Vec<(String, ComponentStatus)>>

List all sources with their current status

§Example
let sources = core.list_sources().await?;
for (id, status) in sources {
    println!("Source {}: {:?}", id, status);
}
Source

pub async fn get_source_info(&self, id: &str) -> Result<SourceRuntime>

Get detailed information about a specific source

§Example
let source_info = core.get_source_info("my-source").await?;
println!("Source type: {}", source_info.source_type);
println!("Status: {:?}", source_info.status);
Source

pub async fn get_source_status(&self, id: &str) -> Result<ComponentStatus>

Get the current status of a specific source

§Example
let status = core.get_source_status("my-source").await?;
println!("Source status: {:?}", status);

Trait Implementations§

Source§

impl Clone for DrasiLib

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl QueryProvider for DrasiLib

Source§

fn get_query_instance<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Arc<dyn Query>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get a query instance by ID

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more