pub struct ReactionBase {
pub id: String,
pub queries: Vec<String>,
pub auto_start: bool,
pub recovery_policy: Option<ReactionRecoveryPolicy>,
pub priority_queue: PriorityQueue<QueryResult>,
pub subscription_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>,
pub processing_task: Arc<RwLock<Option<JoinHandle<()>>>>,
pub shutdown_tx: Arc<RwLock<Option<Sender<()>>>>,
/* private fields */
}Expand description
Base implementations for reaction plugins Base implementation for common reaction functionality
Fields§
§id: StringReaction identifier
queries: Vec<String>List of query IDs to subscribe to
auto_start: boolWhether this reaction should auto-start
recovery_policy: Option<ReactionRecoveryPolicy>Optional recovery policy override
priority_queue: PriorityQueue<QueryResult>Priority queue for timestamp-ordered result processing
subscription_tasks: Arc<RwLock<Vec<JoinHandle<()>>>>Handles to subscription forwarder tasks
processing_task: Arc<RwLock<Option<JoinHandle<()>>>>Handle to the main processing task
shutdown_tx: Arc<RwLock<Option<Sender<()>>>>Sender for shutdown signal to processing task
Implementations§
Source§impl ReactionBase
impl ReactionBase
Sourcepub fn new(params: ReactionBaseParams) -> Self
pub fn new(params: ReactionBaseParams) -> Self
Create a new ReactionBase with the given parameters
Dependencies (query subscriber, state store, graph) are not required during
construction - they will be provided via initialize() when the reaction is added to DrasiLib.
Sourcepub async fn initialize(&self, context: ReactionRuntimeContext)
pub async fn initialize(&self, context: ReactionRuntimeContext)
Initialize the reaction with runtime context.
This method is called automatically by DrasiLib’s add_reaction() method.
Plugin developers do not need to call this directly.
The context provides access to:
reaction_id: The reaction’s unique identifierstate_store: Optional persistent state storageupdate_tx: mpsc sender for fire-and-forget status updates to the graph
Sourcepub async fn context(&self) -> Option<ReactionRuntimeContext>
pub async fn context(&self) -> Option<ReactionRuntimeContext>
Get the runtime context if initialized.
Returns None if initialize() has not been called yet.
Sourcepub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>
pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>>
Get the state store if configured.
Returns None if no state store was provided in the context.
Sourcepub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>>
pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>>
Get the identity provider if set.
Returns the identity provider set either programmatically via
set_identity_provider() or from the runtime context during initialize().
Programmatically-set providers take precedence over context providers.
Sourcepub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>)
pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>)
Set the identity provider programmatically.
This is typically called during reaction construction when the provider
is available from configuration (e.g., with_identity_provider() builder).
Providers set this way take precedence over context-injected providers.
Sourcepub fn get_auto_start(&self) -> bool
pub fn get_auto_start(&self) -> bool
Get whether this reaction should auto-start
Sourcepub fn set_raw_config(&mut self, config: Value)
pub fn set_raw_config(&mut self, config: Value)
Set the original raw config JSON for lossless persistence roundtrips.
Sourcepub fn raw_config(&self) -> Option<&Value>
pub fn raw_config(&self) -> Option<&Value>
Get the original raw config JSON, if set by a descriptor.
Sourcepub fn properties_or_serialize<D: Serialize>(
&self,
fallback_dto: &D,
) -> HashMap<String, Value>
pub fn properties_or_serialize<D: Serialize>( &self, fallback_dto: &D, ) -> HashMap<String, Value>
Build the properties map for this reaction.
If raw_config was set (descriptor path), returns its top-level keys.
Otherwise, serializes fallback_dto (the DTO reconstructed from typed
config) to produce camelCase output.
This eliminates the duplicated if-let + serialize pattern from plugins.
Clone the ReactionBase with shared Arc references
This creates a new ReactionBase that shares the same underlying data through Arc references. Useful for passing to spawned tasks.
Sourcepub async fn create_shutdown_channel(&self) -> Receiver<()>
pub async fn create_shutdown_channel(&self) -> Receiver<()>
Create a shutdown channel and store the sender
Returns the receiver which should be passed to the processing task.
The sender is stored internally and will be triggered by stop_common().
This should be called before spawning the processing task.
Sourcepub fn get_queries(&self) -> &[String]
pub fn get_queries(&self) -> &[String]
Get the query IDs
Sourcepub async fn get_status(&self) -> ComponentStatus
pub async fn get_status(&self) -> ComponentStatus
Get current status.
Sourcepub fn status_handle(&self) -> ComponentStatusHandle
pub fn status_handle(&self) -> ComponentStatusHandle
Returns a cloneable ComponentStatusHandle for use in spawned tasks.
The handle can both read and write the component’s status and automatically
notifies the graph on every status change (after initialize()).
Sourcepub async fn set_status(&self, status: ComponentStatus, message: Option<String>)
pub async fn set_status(&self, status: ComponentStatus, message: Option<String>)
Set the component’s status — updates local state AND notifies the graph.
This is the single canonical way to change a reaction’s status.
Sourcepub async fn enqueue_query_result(&self, result: QueryResult) -> Result<()>
pub async fn enqueue_query_result(&self, result: QueryResult) -> Result<()>
Enqueue a query result for processing.
The host calls this to forward query results to the reaction’s priority queue. Results are processed in timestamp order by the reaction’s processing task.
Sourcepub async fn read_checkpoint(
&self,
query_id: &str,
) -> Result<Option<ReactionCheckpoint>>
pub async fn read_checkpoint( &self, query_id: &str, ) -> Result<Option<ReactionCheckpoint>>
Read the persisted checkpoint for a single query subscription.
Returns Ok(None) if no checkpoint exists (fresh start).
Errors propagate from the state store or deserialization.
Sourcepub async fn read_all_checkpoints(
&self,
) -> Result<HashMap<String, ReactionCheckpoint>>
pub async fn read_all_checkpoints( &self, ) -> Result<HashMap<String, ReactionCheckpoint>>
Read all persisted checkpoints for every query this reaction subscribes to.
Returns a map from query ID to checkpoint. Missing checkpoints (fresh subscriptions) are simply omitted from the result.
Sourcepub async fn write_checkpoint(
&self,
query_id: &str,
checkpoint: &ReactionCheckpoint,
) -> Result<()>
pub async fn write_checkpoint( &self, query_id: &str, checkpoint: &ReactionCheckpoint, ) -> Result<()>
Persist a checkpoint for a single query subscription.
This atomically writes the checkpoint to the state store. It should be called after a batch of query results has been successfully processed.
Sourcepub async fn stop_common(&self) -> Result<()>
pub async fn stop_common(&self) -> Result<()>
Perform common cleanup operations
This method handles:
- Sending shutdown signal to processing task (for graceful termination)
- Aborting all subscription forwarder tasks
- Waiting for or aborting the processing task
- Draining the priority queue
Sourcepub async fn deprovision_common(&self) -> Result<()>
pub async fn deprovision_common(&self) -> Result<()>
Clear the reaction’s state store partition.
This is called during deprovision to remove all persisted state
associated with this reaction. Reactions that override deprovision()
can call this to clean up their state store.
Sourcepub async fn set_processing_task(&self, task: JoinHandle<()>)
pub async fn set_processing_task(&self, task: JoinHandle<()>)
Set the processing task handle
Sourcepub async fn run_standard_loop<F, Fut>(
&self,
shutdown_rx: Receiver<()>,
initial_checkpoints: HashMap<String, ReactionCheckpoint>,
handler: F,
) -> Result<()>
pub async fn run_standard_loop<F, Fut>( &self, shutdown_rx: Receiver<()>, initial_checkpoints: HashMap<String, ReactionCheckpoint>, handler: F, ) -> Result<()>
Run a standard dequeue → dedup → handler → checkpoint loop.
This is an optional convenience for reactions that follow the common pattern. Reactions needing custom scheduling, batching, or multi-query ordering should implement their own loop.
The loop:
- Dequeues from the priority queue (blocks until available).
- Checks the event’s sequence against the persisted checkpoint — events at or before the checkpoint are silently skipped (dedup).
- Calls
handlerwith the event. - On success, writes a new checkpoint with the event’s sequence,
preserving the
config_hashfrom the initial checkpoint map (or 0 if no prior checkpoint exists for that query). - Breaks when
shutdown_rxfires.
§Arguments
shutdown_rx— receiver created via [create_shutdown_channel].initial_checkpoints— pre-loaded checkpoint map (from bootstrap orchestration). The loop uses these for dedup and preserves each query’sconfig_hashwhen advancing the sequence.handler— async function receiving a [QueryResult]. ReturnOk(())to advance the checkpoint, orErrto leave it unchanged (the event will NOT be retried automatically).