Skip to main content

sayiir_runtime/
client.rs

1//! Centralised workflow lifecycle client.
2//!
3//! [`WorkflowClient`] provides a single entry-point for submitting workflows
4//! with idempotency (via [`ConflictPolicy`]) and for lifecycle operations
5//! (cancel, pause, unpause, send event, status) without requiring a runner
6//! or worker.
7//!
8//! This is the recommended API for the distributed model where a
9//! [`PooledWorker`](crate::worker::PooledWorker) executes tasks but a
10//! separate process or service needs to submit workflows and control them.
11
12use std::sync::Arc;
13
14use bytes::Bytes;
15use sayiir_core::codec::sealed;
16use sayiir_core::codec::{Codec, EnvelopeCodec};
17use sayiir_core::snapshot::{SignalKind, SignalRequest};
18use sayiir_core::task::TaskIdentifier;
19use sayiir_core::workflow::{ConflictPolicy, Workflow, WorkflowStatus};
20use sayiir_persistence::{SignalStore, SnapshotStore, TaskResultStore};
21
22use crate::error::RuntimeError;
23use crate::{PrepareRunOutcome, check_existing_instance, prepare_run};
24
25/// A client for submitting and controlling workflow instances.
26///
27/// Unlike [`CheckpointingRunner`](crate::CheckpointingRunner), the client does
28/// **not** execute tasks — it only creates initial snapshots and stores
29/// lifecycle signals. A [`PooledWorker`](crate::worker::PooledWorker) (or
30/// `CheckpointingRunner::resume`) picks up and executes the work.
31///
32/// # Example
33///
34/// ```rust,no_run
35/// use sayiir_runtime::WorkflowClient;
36/// use sayiir_runtime::persistence::InMemoryBackend;
37/// use sayiir_core::workflow::ConflictPolicy;
38///
39/// let backend = InMemoryBackend::new();
40/// let client = WorkflowClient::new(backend)
41///     .with_conflict_policy(ConflictPolicy::UseExisting);
42/// ```
43pub struct WorkflowClient<B> {
44    backend: Arc<B>,
45    conflict_policy: ConflictPolicy,
46}
47
48impl<B> WorkflowClient<B> {
49    /// Create a new client wrapping the given backend.
50    ///
51    /// The default conflict policy is [`ConflictPolicy::Fail`].
52    pub fn new(backend: B) -> Self {
53        Self {
54            backend: Arc::new(backend),
55            conflict_policy: ConflictPolicy::default(),
56        }
57    }
58
59    /// Create a client from a shared backend reference.
60    ///
61    /// Useful when the same backend is shared with a runner or worker.
62    pub fn from_shared(backend: Arc<B>) -> Self {
63        Self {
64            backend,
65            conflict_policy: ConflictPolicy::default(),
66        }
67    }
68
69    /// Set the conflict policy for duplicate instance IDs.
70    #[must_use]
71    pub fn with_conflict_policy(mut self, policy: ConflictPolicy) -> Self {
72        self.conflict_policy = policy;
73        self
74    }
75
76    /// Get a reference to the backend.
77    #[must_use]
78    pub fn backend(&self) -> &Arc<B> {
79        &self.backend
80    }
81}
82
83impl<B> WorkflowClient<B>
84where
85    B: SnapshotStore + SignalStore,
86{
87    /// Submit a workflow for execution.
88    ///
89    /// Creates an initial snapshot in the backend so that a
90    /// [`PooledWorker`](crate::worker::PooledWorker) can pick it up.
91    /// Does **not** execute any tasks.
92    ///
93    /// Returns `(WorkflowStatus, Option<Bytes>)`:
94    /// - `(InProgress, None)` when a fresh snapshot was created.
95    /// - `(status, output)` when the conflict policy returns an existing instance.
96    ///
97    /// # Errors
98    ///
99    /// Returns [`RuntimeError::InstanceAlreadyExists`] when the policy is `Fail`
100    /// and the instance already exists, or propagates backend I/O errors.
101    pub async fn submit<C, Input, M>(
102        &self,
103        workflow: &Workflow<C, Input, M>,
104        instance_id: impl Into<String>,
105        input: Input,
106    ) -> Result<(WorkflowStatus, Option<Bytes>), RuntimeError>
107    where
108        Input: Send + 'static,
109        M: Send + Sync + 'static,
110        C: Codec + EnvelopeCodec + sealed::EncodeValue<Input> + 'static,
111    {
112        let instance_id = instance_id.into();
113        let definition_hash = workflow.definition_hash().to_string();
114        let conflict_policy = self.conflict_policy;
115
116        // Phase 1: check for existing instance before encoding input.
117        if let Some(early) = check_existing_instance(
118            &instance_id,
119            &definition_hash,
120            self.backend.as_ref(),
121            conflict_policy,
122        )
123        .await?
124        {
125            return Ok(early);
126        }
127
128        // Phase 2: encode input and create snapshot.
129        let input_bytes = workflow.context().codec.encode(&input)?;
130        let first_task = workflow.continuation().first_task_hint();
131
132        match prepare_run(
133            instance_id,
134            definition_hash,
135            input_bytes,
136            first_task,
137            self.backend.as_ref(),
138            conflict_policy,
139            true, // prechecked — check_existing_instance already ran
140        )
141        .await?
142        {
143            PrepareRunOutcome::Fresh(_) => Ok((WorkflowStatus::InProgress, None)),
144            PrepareRunOutcome::ExistingStatus(status, output) => Ok((status, output)),
145        }
146    }
147
148    /// Request cancellation of a workflow instance.
149    ///
150    /// Stores a cancel signal in the backend. The worker picks it up
151    /// at the next task boundary.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the signal cannot be stored.
156    pub async fn cancel(
157        &self,
158        instance_id: &str,
159        reason: Option<String>,
160        cancelled_by: Option<String>,
161    ) -> Result<(), RuntimeError> {
162        self.backend
163            .store_signal(
164                instance_id,
165                SignalKind::Cancel,
166                SignalRequest::new(reason, cancelled_by),
167            )
168            .await?;
169        Ok(())
170    }
171
172    /// Request pausing of a workflow instance.
173    ///
174    /// Stores a pause signal in the backend. The worker picks it up
175    /// at the next task boundary.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the signal cannot be stored.
180    pub async fn pause(
181        &self,
182        instance_id: &str,
183        reason: Option<String>,
184        paused_by: Option<String>,
185    ) -> Result<(), RuntimeError> {
186        self.backend
187            .store_signal(
188                instance_id,
189                SignalKind::Pause,
190                SignalRequest::new(reason, paused_by),
191            )
192            .await?;
193        Ok(())
194    }
195
196    /// Unpause a paused workflow instance.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the workflow is not found or not paused.
201    pub async fn unpause(&self, instance_id: &str) -> Result<(), RuntimeError> {
202        self.backend.unpause(instance_id).await?;
203        Ok(())
204    }
205
206    /// Send an external event (signal) to a workflow instance.
207    ///
208    /// The payload is buffered in FIFO order per (`instance_id`, `signal_name`).
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the event cannot be stored.
213    pub async fn send_event(
214        &self,
215        instance_id: &str,
216        signal_name: &str,
217        payload: Bytes,
218    ) -> Result<(), RuntimeError> {
219        self.backend
220            .send_event(instance_id, signal_name, payload)
221            .await?;
222        Ok(())
223    }
224
225    /// Get the current status of a workflow instance.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if the snapshot cannot be loaded.
230    pub async fn status(&self, instance_id: &str) -> Result<WorkflowStatus, RuntimeError> {
231        let snapshot = self.backend.load_snapshot(instance_id).await?;
232        Ok(snapshot.state.as_status())
233    }
234}
235
236impl<B> WorkflowClient<B>
237where
238    B: SnapshotStore + SignalStore + TaskResultStore,
239{
240    /// Get a single task result from a workflow instance.
241    ///
242    /// Returns `Ok(Some(bytes))` if the task has completed, `Ok(None)` if the
243    /// task was never executed. For completed/failed workflows, the result is
244    /// recovered from the backend's history or cache.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the snapshot cannot be loaded.
249    pub async fn get_task_result(
250        &self,
251        instance_id: &str,
252        task_id: &str,
253    ) -> Result<Option<Bytes>, RuntimeError> {
254        Ok(self.backend.load_task_result(instance_id, task_id).await?)
255    }
256
257    /// Type-safe variant of [`get_task_result`](Self::get_task_result) that
258    /// derives the `task_id` from a [`TaskIdentifier`] implementor (e.g. a
259    /// `#[task]`-generated struct).
260    ///
261    /// ```rust,ignore
262    /// let result = client.get_task_result_of::<ValidateOrderTask>("order-42").await?;
263    /// ```
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the snapshot cannot be loaded.
268    pub async fn get_task_result_of<T: TaskIdentifier>(
269        &self,
270        instance_id: &str,
271    ) -> Result<Option<Bytes>, RuntimeError> {
272        self.get_task_result(instance_id, T::task_id()).await
273    }
274}