sayiir_runtime/client.rs
1//! Centralised workflow lifecycle client.
2//!
3//! [`WorkflowClient`] provides a single entry-point for submitting workflows
4//! with idempotency (via [`ConflictPolicy`]) and for lifecycle operations
5//! (cancel, pause, unpause, send event, status) without requiring a runner
6//! or worker.
7//!
8//! This is the recommended API for the distributed model where a
9//! [`PooledWorker`](crate::worker::PooledWorker) executes tasks but a
10//! separate process or service needs to submit workflows and control them.
11
12use std::sync::Arc;
13
14use bytes::Bytes;
15use sayiir_core::codec::sealed;
16use sayiir_core::codec::{Codec, EnvelopeCodec};
17use sayiir_core::snapshot::{SignalKind, SignalRequest};
18use sayiir_core::task::TaskIdentifier;
19use sayiir_core::workflow::{ConflictPolicy, Workflow, WorkflowStatus};
20use sayiir_persistence::{SignalStore, SnapshotStore, TaskResultStore};
21
22use crate::error::RuntimeError;
23use crate::{PrepareRunOutcome, check_existing_instance, prepare_run};
24
25/// A client for submitting and controlling workflow instances.
26///
27/// Unlike [`CheckpointingRunner`](crate::CheckpointingRunner), the client does
28/// **not** execute tasks — it only creates initial snapshots and stores
29/// lifecycle signals. A [`PooledWorker`](crate::worker::PooledWorker) (or
30/// `CheckpointingRunner::resume`) picks up and executes the work.
31///
32/// # Example
33///
34/// ```rust,no_run
35/// use sayiir_runtime::WorkflowClient;
36/// use sayiir_runtime::persistence::InMemoryBackend;
37/// use sayiir_core::workflow::ConflictPolicy;
38///
39/// let backend = InMemoryBackend::new();
40/// let client = WorkflowClient::new(backend)
41/// .with_conflict_policy(ConflictPolicy::UseExisting);
42/// ```
43pub struct WorkflowClient<B> {
44 backend: Arc<B>,
45 conflict_policy: ConflictPolicy,
46}
47
48impl<B> WorkflowClient<B> {
49 /// Create a new client wrapping the given backend.
50 ///
51 /// The default conflict policy is [`ConflictPolicy::Fail`].
52 pub fn new(backend: B) -> Self {
53 Self {
54 backend: Arc::new(backend),
55 conflict_policy: ConflictPolicy::default(),
56 }
57 }
58
59 /// Create a client from a shared backend reference.
60 ///
61 /// Useful when the same backend is shared with a runner or worker.
62 pub fn from_shared(backend: Arc<B>) -> Self {
63 Self {
64 backend,
65 conflict_policy: ConflictPolicy::default(),
66 }
67 }
68
69 /// Set the conflict policy for duplicate instance IDs.
70 #[must_use]
71 pub fn with_conflict_policy(mut self, policy: ConflictPolicy) -> Self {
72 self.conflict_policy = policy;
73 self
74 }
75
76 /// Get a reference to the backend.
77 #[must_use]
78 pub fn backend(&self) -> &Arc<B> {
79 &self.backend
80 }
81}
82
83impl<B> WorkflowClient<B>
84where
85 B: SnapshotStore + SignalStore,
86{
87 /// Submit a workflow for execution.
88 ///
89 /// Creates an initial snapshot in the backend so that a
90 /// [`PooledWorker`](crate::worker::PooledWorker) can pick it up.
91 /// Does **not** execute any tasks.
92 ///
93 /// Returns `(WorkflowStatus, Option<Bytes>)`:
94 /// - `(InProgress, None)` when a fresh snapshot was created.
95 /// - `(status, output)` when the conflict policy returns an existing instance.
96 ///
97 /// # Errors
98 ///
99 /// Returns [`RuntimeError::InstanceAlreadyExists`] when the policy is `Fail`
100 /// and the instance already exists, or propagates backend I/O errors.
101 pub async fn submit<C, Input, M>(
102 &self,
103 workflow: &Workflow<C, Input, M>,
104 instance_id: impl Into<String>,
105 input: Input,
106 ) -> Result<(WorkflowStatus, Option<Bytes>), RuntimeError>
107 where
108 Input: Send + 'static,
109 M: Send + Sync + 'static,
110 C: Codec + EnvelopeCodec + sealed::EncodeValue<Input> + 'static,
111 {
112 let instance_id = instance_id.into();
113 let definition_hash = workflow.definition_hash().to_string();
114 let conflict_policy = self.conflict_policy;
115
116 // Phase 1: check for existing instance before encoding input.
117 if let Some(early) = check_existing_instance(
118 &instance_id,
119 &definition_hash,
120 self.backend.as_ref(),
121 conflict_policy,
122 )
123 .await?
124 {
125 return Ok(early);
126 }
127
128 // Phase 2: encode input and create snapshot.
129 let input_bytes = workflow.context().codec.encode(&input)?;
130 let first_task = workflow.continuation().first_task_hint();
131
132 match prepare_run(
133 instance_id,
134 definition_hash,
135 input_bytes,
136 first_task,
137 self.backend.as_ref(),
138 conflict_policy,
139 true, // prechecked — check_existing_instance already ran
140 )
141 .await?
142 {
143 PrepareRunOutcome::Fresh(_) => Ok((WorkflowStatus::InProgress, None)),
144 PrepareRunOutcome::ExistingStatus(status, output) => Ok((status, output)),
145 }
146 }
147
148 /// Request cancellation of a workflow instance.
149 ///
150 /// Stores a cancel signal in the backend. The worker picks it up
151 /// at the next task boundary.
152 ///
153 /// # Errors
154 ///
155 /// Returns an error if the signal cannot be stored.
156 pub async fn cancel(
157 &self,
158 instance_id: &str,
159 reason: Option<String>,
160 cancelled_by: Option<String>,
161 ) -> Result<(), RuntimeError> {
162 self.backend
163 .store_signal(
164 instance_id,
165 SignalKind::Cancel,
166 SignalRequest::new(reason, cancelled_by),
167 )
168 .await?;
169 Ok(())
170 }
171
172 /// Request pausing of a workflow instance.
173 ///
174 /// Stores a pause signal in the backend. The worker picks it up
175 /// at the next task boundary.
176 ///
177 /// # Errors
178 ///
179 /// Returns an error if the signal cannot be stored.
180 pub async fn pause(
181 &self,
182 instance_id: &str,
183 reason: Option<String>,
184 paused_by: Option<String>,
185 ) -> Result<(), RuntimeError> {
186 self.backend
187 .store_signal(
188 instance_id,
189 SignalKind::Pause,
190 SignalRequest::new(reason, paused_by),
191 )
192 .await?;
193 Ok(())
194 }
195
196 /// Unpause a paused workflow instance.
197 ///
198 /// # Errors
199 ///
200 /// Returns an error if the workflow is not found or not paused.
201 pub async fn unpause(&self, instance_id: &str) -> Result<(), RuntimeError> {
202 self.backend.unpause(instance_id).await?;
203 Ok(())
204 }
205
206 /// Send an external event (signal) to a workflow instance.
207 ///
208 /// The payload is buffered in FIFO order per (`instance_id`, `signal_name`).
209 ///
210 /// # Errors
211 ///
212 /// Returns an error if the event cannot be stored.
213 pub async fn send_event(
214 &self,
215 instance_id: &str,
216 signal_name: &str,
217 payload: Bytes,
218 ) -> Result<(), RuntimeError> {
219 self.backend
220 .send_event(instance_id, signal_name, payload)
221 .await?;
222 Ok(())
223 }
224
225 /// Get the current status of a workflow instance.
226 ///
227 /// # Errors
228 ///
229 /// Returns an error if the snapshot cannot be loaded.
230 pub async fn status(&self, instance_id: &str) -> Result<WorkflowStatus, RuntimeError> {
231 let snapshot = self.backend.load_snapshot(instance_id).await?;
232 Ok(snapshot.state.as_status())
233 }
234}
235
236impl<B> WorkflowClient<B>
237where
238 B: SnapshotStore + SignalStore + TaskResultStore,
239{
240 /// Get a single task result from a workflow instance.
241 ///
242 /// Returns `Ok(Some(bytes))` if the task has completed, `Ok(None)` if the
243 /// task was never executed. For completed/failed workflows, the result is
244 /// recovered from the backend's history or cache.
245 ///
246 /// # Errors
247 ///
248 /// Returns an error if the snapshot cannot be loaded.
249 pub async fn get_task_result(
250 &self,
251 instance_id: &str,
252 task_id: &str,
253 ) -> Result<Option<Bytes>, RuntimeError> {
254 Ok(self.backend.load_task_result(instance_id, task_id).await?)
255 }
256
257 /// Type-safe variant of [`get_task_result`](Self::get_task_result) that
258 /// derives the `task_id` from a [`TaskIdentifier`] implementor (e.g. a
259 /// `#[task]`-generated struct).
260 ///
261 /// ```rust,ignore
262 /// let result = client.get_task_result_of::<ValidateOrderTask>("order-42").await?;
263 /// ```
264 ///
265 /// # Errors
266 ///
267 /// Returns an error if the snapshot cannot be loaded.
268 pub async fn get_task_result_of<T: TaskIdentifier>(
269 &self,
270 instance_id: &str,
271 ) -> Result<Option<Bytes>, RuntimeError> {
272 self.get_task_result(instance_id, T::task_id()).await
273 }
274}