Skip to main content

ReactionBase

Struct ReactionBase 

Source
pub struct ReactionBase {
    pub id: String,
    pub queries: Vec<String>,
    pub auto_start: bool,
    pub recovery_policy: Option<ReactionRecoveryPolicy>,
    pub priority_queue: PriorityQueue<QueryResult>,
    pub subscription_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
    pub processing_task: Arc<RwLock<Option<JoinHandle<()>>>>,
    pub shutdown_tx: Arc<RwLock<Option<Sender<()>>>>,
    /* private fields */
}
Expand description

Base implementations for reaction plugins Base implementation for common reaction functionality

Fields§

§id: String

Reaction identifier

§queries: Vec<String>

List of query IDs to subscribe to

§auto_start: bool

Whether this reaction should auto-start

§recovery_policy: Option<ReactionRecoveryPolicy>

Optional recovery policy override

§priority_queue: PriorityQueue<QueryResult>

Priority queue for timestamp-ordered result processing

§subscription_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>

Handles to subscription forwarder tasks

§processing_task: Arc<RwLock<Option<JoinHandle<()>>>>

Handle to the main processing task

§shutdown_tx: Arc<RwLock<Option<Sender<()>>>>

Sender for shutdown signal to processing task

Implementations§

Source§

impl ReactionBase

Source

pub fn new(params: ReactionBaseParams) -> Self

Create a new ReactionBase with the given parameters

Dependencies (query subscriber, state store, graph) are not required during construction - they will be provided via initialize() when the reaction is added to DrasiLib.

Source

pub async fn initialize(&self, context: ReactionRuntimeContext)

Initialize the reaction with runtime context.

This method is called automatically by DrasiLib’s add_reaction() method. Plugin developers do not need to call this directly.

The context provides access to:

  • reaction_id: The reaction’s unique identifier
  • state_store: Optional persistent state storage
  • update_tx: mpsc sender for fire-and-forget status updates to the graph
Source

pub async fn context(&self) -> Option<ReactionRuntimeContext>

Get the runtime context if initialized.

Returns None if initialize() has not been called yet.

Source

pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>

Get the state store if configured.

Returns None if no state store was provided in the context.

Source

pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>>

Get the identity provider if set.

Returns the identity provider set either programmatically via set_identity_provider() or from the runtime context during initialize(). Programmatically-set providers take precedence over context providers.

Source

pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>)

Set the identity provider programmatically.

This is typically called during reaction construction when the provider is available from configuration (e.g., with_identity_provider() builder). Providers set this way take precedence over context-injected providers.

Source

pub fn get_auto_start(&self) -> bool

Get whether this reaction should auto-start

Source

pub fn set_raw_config(&mut self, config: Value)

Set the original raw config JSON for lossless persistence roundtrips.

Source

pub fn raw_config(&self) -> Option<&Value>

Get the original raw config JSON, if set by a descriptor.

Source

pub fn properties_or_serialize<D: Serialize>( &self, fallback_dto: &D, ) -> HashMap<String, Value>

Build the properties map for this reaction.

If raw_config was set (descriptor path), returns its top-level keys. Otherwise, serializes fallback_dto (the DTO reconstructed from typed config) to produce camelCase output.

This eliminates the duplicated if-let + serialize pattern from plugins.

Source

pub fn clone_shared(&self) -> Self

Clone the ReactionBase with shared Arc references

This creates a new ReactionBase that shares the same underlying data through Arc references. Useful for passing to spawned tasks.

Source

pub async fn create_shutdown_channel(&self) -> Receiver<()>

Create a shutdown channel and store the sender

Returns the receiver which should be passed to the processing task. The sender is stored internally and will be triggered by stop_common().

This should be called before spawning the processing task.

Source

pub fn get_id(&self) -> &str

Get the reaction ID

Source

pub fn get_queries(&self) -> &[String]

Get the query IDs

Source

pub async fn get_status(&self) -> ComponentStatus

Get current status.

Source

pub fn status_handle(&self) -> ComponentStatusHandle

Returns a cloneable ComponentStatusHandle for use in spawned tasks.

The handle can both read and write the component’s status and automatically notifies the graph on every status change (after initialize()).

Source

pub async fn set_status(&self, status: ComponentStatus, message: Option<String>)

Set the component’s status — updates local state AND notifies the graph.

This is the single canonical way to change a reaction’s status.

Source

pub async fn enqueue_query_result(&self, result: QueryResult) -> Result<()>

Enqueue a query result for processing.

The host calls this to forward query results to the reaction’s priority queue. Results are processed in timestamp order by the reaction’s processing task.

Source

pub async fn read_checkpoint( &self, query_id: &str, ) -> Result<Option<ReactionCheckpoint>>

Read the persisted checkpoint for a single query subscription.

Returns Ok(None) if no checkpoint exists (fresh start). Errors propagate from the state store or deserialization.

Source

pub async fn read_all_checkpoints( &self, ) -> Result<HashMap<String, ReactionCheckpoint>>

Read all persisted checkpoints for every query this reaction subscribes to.

Returns a map from query ID to checkpoint. Missing checkpoints (fresh subscriptions) are simply omitted from the result.

Source

pub async fn write_checkpoint( &self, query_id: &str, checkpoint: &ReactionCheckpoint, ) -> Result<()>

Persist a checkpoint for a single query subscription.

This atomically writes the checkpoint to the state store. It should be called after a batch of query results has been successfully processed.

Source

pub async fn stop_common(&self) -> Result<()>

Perform common cleanup operations

This method handles:

  1. Sending shutdown signal to processing task (for graceful termination)
  2. Aborting all subscription forwarder tasks
  3. Waiting for or aborting the processing task
  4. Draining the priority queue
Source

pub async fn deprovision_common(&self) -> Result<()>

Clear the reaction’s state store partition.

This is called during deprovision to remove all persisted state associated with this reaction. Reactions that override deprovision() can call this to clean up their state store.

Source

pub async fn set_processing_task(&self, task: JoinHandle<()>)

Set the processing task handle

Source

pub async fn run_standard_loop<F, Fut>( &self, shutdown_rx: Receiver<()>, initial_checkpoints: HashMap<String, ReactionCheckpoint>, handler: F, ) -> Result<()>
where F: Fn(Arc<QueryResult>) -> Fut + Send + Sync, Fut: Future<Output = Result<()>> + Send,

Run a standard dequeue → dedup → handler → checkpoint loop.

This is an optional convenience for reactions that follow the common pattern. Reactions needing custom scheduling, batching, or multi-query ordering should implement their own loop.

The loop:

  1. Dequeues from the priority queue (blocks until available).
  2. Checks the event’s sequence against the persisted checkpoint — events at or before the checkpoint are silently skipped (dedup).
  3. Calls handler with the event.
  4. On success, writes a new checkpoint with the event’s sequence, preserving the config_hash from the initial checkpoint map (or 0 if no prior checkpoint exists for that query).
  5. Breaks when shutdown_rx fires.
§Arguments
  • shutdown_rx — receiver created via [create_shutdown_channel].
  • initial_checkpoints — pre-loaded checkpoint map (from bootstrap orchestration). The loop uses these for dedup and preserves each query’s config_hash when advancing the sequence.
  • handler — async function receiving a [QueryResult]. Return Ok(()) to advance the checkpoint, or Err to leave it unchanged (the event will NOT be retried automatically).

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more