Skip to main content

aion_client/
ops.rs

1//! start/signal/query/cancel/list/describe over the transport.
2
3use std::num::NonZeroU64;
4use std::time::Duration;
5
6use aion_core::{Event, Payload, RunId, WorkflowFilter, WorkflowId, WorkflowSummary};
7use aion_proto::{
8    ProtoCancelRequest, ProtoDescribeWorkflowRequest, ProtoListWorkflowsRequest, ProtoPayload,
9    ProtoQueryRequest, ProtoRunId, ProtoSignalRequest, ProtoStartWorkflowRequest, ProtoWorkflowId,
10    WireError, decode_core_value, decode_event, decode_workflow_summary, encode_core_value,
11    proto_query_response,
12};
13use aion_store::visibility::ListWorkflowsFilter;
14
15use serde::Serialize;
16use serde::de::DeserializeOwned;
17
18use crate::client::Client;
19use crate::error::ClientError;
20use crate::handle::WorkflowHandle;
21use crate::payload::{from_payload, to_payload};
22use crate::stream::{EventStream, SubscribeTarget, event_stream, event_stream_from};
23
24/// Options accepted by [`Client::start`].
25#[derive(Clone, Debug, Default, PartialEq, Eq)]
26pub struct StartOptions {
27    /// Namespace override for this start request.
28    pub namespace: Option<String>,
29    /// Caller-supplied idempotency key for safe local retry replay.
30    ///
31    /// The current AW protobuf has not added an idempotency field yet, so this is
32    /// enforced at the SDK boundary without inventing a client-owned wire field.
33    /// Reusing a key for a different start request returns
34    /// [`ClientError::AlreadyExists`].
35    pub idempotency_key: Option<String>,
36}
37
38/// Pagination options accepted by [`Client::list`].
39///
40/// The current AW protobuf carries `request_id` through the filter envelope,
41/// but not `limit` or `cursor`; populated `limit`/`cursor` values return
42/// [`ClientError::InvalidArgument`] instead of being silently ignored.
43#[derive(Clone, Debug, Default, PartialEq, Eq)]
44pub struct ListPage {
45    /// Caller request identifier carried in the current filter envelope.
46    pub request_id: Option<String>,
47    /// Requested page size reserved by the contract.
48    pub limit: Option<usize>,
49    /// Continuation cursor reserved by the contract.
50    pub cursor: Option<String>,
51}
52
53/// Workflow detail returned by [`Client::describe`].
54#[derive(Clone, Debug, PartialEq)]
55pub struct WorkflowDescription {
56    /// Lightweight workflow summary reused from `aion-core`.
57    pub summary: WorkflowSummary,
58    /// Optional event history when the server includes it.
59    pub history: Vec<Event>,
60}
61
62impl Client {
63    /// Starts a workflow and returns the assigned workflow and run identifiers.
64    ///
65    /// # Errors
66    ///
67    /// Returns [`ClientError`] when transport, server, or response conversion fails.
68    pub async fn start(
69        &self,
70        workflow_type: impl Into<String>,
71        input: Payload,
72        opts: StartOptions,
73    ) -> Result<WorkflowHandle, ClientError> {
74        validate_start_options(&opts)?;
75        let idempotency_key = opts.idempotency_key.clone();
76        let namespace = operation_namespace(self, opts.namespace);
77        let workflow_type = workflow_type.into();
78        let fingerprint = idempotency_key.as_ref().map(|key| {
79            StartFingerprint::new(
80                namespace.clone(),
81                workflow_type.clone(),
82                &input,
83                key.clone(),
84            )
85        });
86        if let Some(fingerprint) = &fingerprint {
87            if let Some(handle) = self.cached_start(fingerprint).await? {
88                return Ok(handle);
89            }
90        }
91        let response = self
92            .transport
93            .start_workflow(ProtoStartWorkflowRequest {
94                namespace,
95                workflow_type,
96                input: Some(ProtoPayload::from(input)),
97            })
98            .await?;
99        let workflow_id = decode_required_workflow_id(response.workflow_id, "start response")?;
100        let run_id = decode_required_run_id(response.run_id, "start response")?;
101        let handle = WorkflowHandle::from_ids(self.clone(), workflow_id, run_id);
102        if let Some(fingerprint) = fingerprint {
103            self.record_start(fingerprint, handle.clone()).await?;
104        }
105        Ok(handle)
106    }
107
108    /// Starts a workflow after serializing `input` as JSON.
109    ///
110    /// # Errors
111    ///
112    /// Returns [`ClientError::InvalidArgument`] when serialization fails, or the
113    /// delegated start error otherwise.
114    pub async fn start_typed<T>(
115        &self,
116        workflow_type: impl Into<String>,
117        input: &T,
118        opts: StartOptions,
119    ) -> Result<WorkflowHandle, ClientError>
120    where
121        T: Serialize + ?Sized,
122    {
123        self.start(workflow_type, to_payload(input)?, opts).await
124    }
125
126    /// Sends a signal to the latest run, or to `run_id` when supplied.
127    ///
128    /// # Errors
129    ///
130    /// Returns [`ClientError`] when transport, server, or request conversion fails.
131    pub async fn signal(
132        &self,
133        workflow_id: &WorkflowId,
134        run_id: Option<&RunId>,
135        name: impl Into<String>,
136        payload: Payload,
137    ) -> Result<(), ClientError> {
138        self.transport
139            .signal(ProtoSignalRequest {
140                namespace: self.namespace().to_owned(),
141                workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
142                run_id: run_id.cloned().map(ProtoRunId::from),
143                signal_name: name.into(),
144                payload: Some(ProtoPayload::from(payload)),
145            })
146            .await?;
147        Ok(())
148    }
149
150    /// Serializes `value` as JSON and sends it as a signal payload.
151    ///
152    /// # Errors
153    ///
154    /// Returns [`ClientError::InvalidArgument`] when serialization fails, or the
155    /// delegated signal error otherwise.
156    pub async fn signal_typed<T>(
157        &self,
158        workflow_id: &WorkflowId,
159        run_id: Option<&RunId>,
160        name: impl Into<String>,
161        value: &T,
162    ) -> Result<(), ClientError>
163    where
164        T: Serialize + ?Sized,
165    {
166        self.signal(workflow_id, run_id, name, to_payload(value)?)
167            .await
168    }
169
170    /// Queries the latest run, or `run_id` when supplied, with a local deadline.
171    ///
172    /// The current AW protobuf does not yet carry query argument payloads, so a
173    /// non-empty `args` payload returns [`ClientError::InvalidArgument`] instead
174    /// of being silently dropped.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`ClientError::QueryTimeout`] when `deadline` elapses.
179    pub async fn query(
180        &self,
181        workflow_id: &WorkflowId,
182        run_id: Option<&RunId>,
183        name: impl Into<String>,
184        args: Payload,
185        deadline: Duration,
186    ) -> Result<Payload, ClientError> {
187        validate_query_args(&args)?;
188        let response = tokio::time::timeout(
189            deadline,
190            self.transport.query(ProtoQueryRequest {
191                namespace: self.namespace().to_owned(),
192                workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
193                run_id: run_id.cloned().map(ProtoRunId::from),
194                query_name: name.into(),
195            }),
196        )
197        .await
198        .map_err(|_| {
199            ClientError::query_timeout(format!(
200                "query deadline of {deadline:?} elapsed before the server replied"
201            ))
202        })??;
203
204        match response.outcome {
205            Some(proto_query_response::Outcome::Result(payload)) => {
206                Payload::try_from(payload).map_err(ClientError::from_wire_error)
207            }
208            Some(proto_query_response::Outcome::Error(error)) => Err(query_error(error)),
209            None => Err(ClientError::server("query response outcome is missing")),
210        }
211    }
212
213    /// Serializes `args` as JSON, queries a workflow, and deserializes the JSON result.
214    ///
215    /// # Errors
216    ///
217    /// Returns [`ClientError::InvalidArgument`] when serialization or result
218    /// decoding fails, or the delegated query error otherwise.
219    pub async fn query_typed<A, R>(
220        &self,
221        workflow_id: &WorkflowId,
222        run_id: Option<&RunId>,
223        name: impl Into<String>,
224        args: &A,
225        deadline: Duration,
226    ) -> Result<R, ClientError>
227    where
228        A: Serialize + ?Sized,
229        R: DeserializeOwned,
230    {
231        let payload = self
232            .query(
233                workflow_id,
234                run_id,
235                name,
236                query_args_payload(args)?,
237                deadline,
238            )
239            .await?;
240        from_payload(&payload)
241    }
242
243    /// Requests cancellation of the latest run, or `run_id` when supplied.
244    ///
245    /// Success means the server accepted the cancellation request; it is not a
246    /// confirmation that the workflow has reached a terminal cancelled state.
247    ///
248    /// # Errors
249    ///
250    /// Returns [`ClientError`] when transport, server, or request conversion fails.
251    pub async fn cancel(
252        &self,
253        workflow_id: &WorkflowId,
254        run_id: Option<&RunId>,
255        reason: impl Into<String>,
256    ) -> Result<(), ClientError> {
257        self.transport
258            .cancel(ProtoCancelRequest {
259                namespace: self.namespace().to_owned(),
260                workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
261                run_id: run_id.cloned().map(ProtoRunId::from),
262                reason: reason.into(),
263            })
264            .await?;
265        Ok(())
266    }
267
268    /// Lists workflows matching a filter.
269    ///
270    /// # Errors
271    ///
272    /// Returns [`ClientError`] when transport, server, or response conversion fails.
273    pub async fn list(
274        &self,
275        filter: &WorkflowFilter,
276        page: ListPage,
277    ) -> Result<Vec<WorkflowSummary>, ClientError> {
278        validate_list_page(&page)?;
279        let namespace = self.namespace().to_owned();
280        let filter = workflow_filter_to_visibility(filter)?;
281        let filter = encode_core_value(namespace.clone(), page.request_id, &filter)
282            .map_err(ClientError::from_wire_error)?;
283        let response = self
284            .transport
285            .list_workflows(ProtoListWorkflowsRequest {
286                namespace,
287                filter: Some(filter),
288            })
289            .await?;
290
291        response
292            .summaries
293            .iter()
294            .map(decode_visibility_summary)
295            .map(|result| result.map_err(ClientError::from_wire_error))
296            .collect()
297    }
298
299    /// Describes the latest run, or `run_id` when supplied.
300    ///
301    /// # Errors
302    ///
303    /// Returns [`ClientError`] when transport, server, or response conversion fails.
304    pub async fn describe(
305        &self,
306        workflow_id: &WorkflowId,
307        run_id: Option<&RunId>,
308    ) -> Result<WorkflowDescription, ClientError> {
309        let response = self
310            .transport
311            .describe_workflow(ProtoDescribeWorkflowRequest {
312                namespace: self.namespace().to_owned(),
313                workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
314                run_id: run_id.cloned().map(ProtoRunId::from),
315                include_history: true,
316            })
317            .await?;
318        let summary = response
319            .summary
320            .as_ref()
321            .ok_or_else(|| ClientError::server("describe response summary is missing"))
322            .and_then(|summary| {
323                decode_workflow_summary(summary).map_err(ClientError::from_wire_error)
324            })?;
325        let history = response
326            .history
327            .iter()
328            .map(decode_event)
329            .map(|result| result.map_err(ClientError::from_wire_error))
330            .collect::<Result<Vec<_>, _>>()?;
331        Ok(WorkflowDescription { summary, history })
332    }
333
334    /// Subscribes to events for a workflow.
335    #[must_use]
336    pub fn subscribe_workflow(&self, workflow_id: &WorkflowId) -> EventStream {
337        event_stream(
338            self.transport.clone(),
339            self.namespace().to_owned(),
340            SubscribeTarget::Workflow {
341                workflow_id: workflow_id.clone(),
342            },
343        )
344    }
345
346    /// Subscribes to events for a workflow, attaching from an explicit
347    /// per-workflow sequence cursor.
348    ///
349    /// `resume_from` is the first sequence number wanted (`resume_from_seq`
350    /// on the wire); `1` replays the workflow's full recorded history before
351    /// splicing into the live stream, gap-free and duplicate-free.
352    #[must_use]
353    pub fn subscribe_workflow_from(
354        &self,
355        workflow_id: &WorkflowId,
356        resume_from: NonZeroU64,
357    ) -> EventStream {
358        event_stream_from(
359            self.transport.clone(),
360            self.namespace().to_owned(),
361            workflow_id.clone(),
362            resume_from,
363        )
364    }
365
366    /// Subscribes to events selected by the supplied workflow filter.
367    #[must_use]
368    pub fn subscribe(&self, filter: WorkflowFilter) -> EventStream {
369        event_stream(
370            self.transport.clone(),
371            self.namespace().to_owned(),
372            SubscribeTarget::Filtered { filter },
373        )
374    }
375
376    /// Subscribes to every event visible to this client namespace.
377    #[must_use]
378    pub fn subscribe_firehose(&self) -> EventStream {
379        event_stream(
380            self.transport.clone(),
381            self.namespace().to_owned(),
382            SubscribeTarget::Firehose,
383        )
384    }
385}
386
387#[derive(Clone, Debug, PartialEq, Eq)]
388pub(crate) struct StartFingerprint {
389    namespace: String,
390    workflow_type: String,
391    content_type: aion_core::ContentType,
392    bytes: Vec<u8>,
393    idempotency_key: String,
394}
395
396impl StartFingerprint {
397    fn new(
398        namespace: String,
399        workflow_type: String,
400        input: &Payload,
401        idempotency_key: String,
402    ) -> Self {
403        Self {
404            namespace,
405            workflow_type,
406            content_type: input.content_type().clone(),
407            bytes: input.bytes().to_vec(),
408            idempotency_key,
409        }
410    }
411
412    pub(crate) fn key(&self) -> &str {
413        &self.idempotency_key
414    }
415}
416
417fn operation_namespace(client: &Client, namespace: Option<String>) -> String {
418    namespace.unwrap_or_else(|| client.namespace().to_owned())
419}
420
421fn validate_start_options(opts: &StartOptions) -> Result<(), ClientError> {
422    if opts
423        .idempotency_key
424        .as_ref()
425        .is_some_and(std::string::String::is_empty)
426    {
427        return Err(ClientError::invalid_argument(
428            "idempotency_key must not be empty",
429        ));
430    }
431    Ok(())
432}
433
434fn validate_query_args(args: &Payload) -> Result<(), ClientError> {
435    if !args.bytes().is_empty() {
436        return Err(ClientError::invalid_argument(
437            "query arguments are not carried by the current wire contract; \
438             pass an empty payload",
439        ));
440    }
441    Ok(())
442}
443
444fn query_args_payload<T>(args: &T) -> Result<Payload, ClientError>
445where
446    T: Serialize + ?Sized,
447{
448    let payload = to_payload(args)?;
449    if payload.bytes() == b"null" {
450        Ok(Payload::new(payload.content_type().clone(), Vec::new()))
451    } else {
452        Ok(payload)
453    }
454}
455
456fn validate_list_page(page: &ListPage) -> Result<(), ClientError> {
457    if page.limit.is_some() || page.cursor.is_some() {
458        return Err(ClientError::invalid_argument(
459            "list pagination limit/cursor are reserved by the contract and \
460             not yet carried by the wire",
461        ));
462    }
463    Ok(())
464}
465
466fn workflow_filter_to_visibility(
467    filter: &WorkflowFilter,
468) -> Result<ListWorkflowsFilter, ClientError> {
469    if filter.parent.is_some() {
470        return Err(ClientError::invalid_argument(
471            "parent workflow filters are not carried by the visibility wire contract",
472        ));
473    }
474
475    Ok(ListWorkflowsFilter {
476        workflow_type: filter.workflow_type.clone(),
477        status: filter.status,
478        started_after: filter.started_after,
479        started_before: filter.started_before,
480        ..ListWorkflowsFilter::default()
481    })
482}
483
484fn decode_visibility_summary(
485    envelope: &aion_proto::WireEnvelope,
486) -> Result<WorkflowSummary, WireError> {
487    let summary = decode_core_value::<aion_store::visibility::WorkflowSummary>(envelope)?;
488    Ok(WorkflowSummary {
489        workflow_id: summary.workflow_id,
490        workflow_type: summary.workflow_type,
491        status: summary.status,
492        started_at: summary.start_time,
493        ended_at: summary.close_time,
494        parent: None,
495    })
496}
497
498fn decode_required_workflow_id(
499    value: Option<ProtoWorkflowId>,
500    context: &str,
501) -> Result<WorkflowId, ClientError> {
502    value
503        .ok_or_else(|| ClientError::server(format!("{context} workflow id is missing")))?
504        .try_into()
505        .map_err(ClientError::from_wire_error)
506}
507
508fn decode_required_run_id(value: Option<ProtoRunId>, context: &str) -> Result<RunId, ClientError> {
509    value
510        .ok_or_else(|| ClientError::server(format!("{context} run id is missing")))?
511        .try_into()
512        .map_err(ClientError::from_wire_error)
513}
514
515/// Maps a `QueryResponse.error` payload through the shared wire taxonomy.
516///
517/// The server reports query-handler application failures with the dedicated
518/// `query_failed` wire code, so the shared map yields [`ClientError::QueryFailed`]
519/// directly; `backend` stays an unexpected server fault.
520fn query_error(error: aion_proto::ProtoWireError) -> ClientError {
521    ClientError::from_proto_wire_error(error)
522}
523
524#[cfg(test)]
525mod tests {
526    use std::sync::Arc;
527    use std::time::Duration;
528
529    use aion_core::{ContentType, Payload, WorkflowFilter, WorkflowId, WorkflowStatus};
530    use aion_proto::{
531        ProtoCancelResponse, ProtoDescribeWorkflowResponse, ProtoListWorkflowsResponse,
532        ProtoQueryResponse, ProtoRunId, ProtoSignalResponse, ProtoStartWorkflowResponse,
533        ProtoWorkflowId, WireError, encode_core_value, encode_workflow_summary,
534        proto_query_response,
535    };
536    use async_trait::async_trait;
537    use chrono::Utc;
538    use futures::StreamExt;
539    use futures::stream;
540    use tokio::sync::Mutex;
541
542    use super::{ListPage, StartOptions};
543    use crate::client::{Client, ClientBuilder, ClientConfig};
544    use crate::error::ClientError;
545    use crate::transport::{SubscriptionAttempt, WorkflowTransport};
546
547    #[derive(Default)]
548    struct StubTransport {
549        last_start: Mutex<Option<aion_proto::ProtoStartWorkflowRequest>>,
550        last_signal: Mutex<Option<aion_proto::ProtoSignalRequest>>,
551        last_query: Mutex<Option<aion_proto::ProtoQueryRequest>>,
552        last_cancel: Mutex<Option<aion_proto::ProtoCancelRequest>>,
553        last_list: Mutex<Option<aion_proto::ProtoListWorkflowsRequest>>,
554        last_describe: Mutex<Option<aion_proto::ProtoDescribeWorkflowRequest>>,
555        start_error: Mutex<Option<ClientError>>,
556        signal_error: Mutex<Option<ClientError>>,
557        query_response: Mutex<Option<Result<ProtoQueryResponse, ClientError>>>,
558    }
559
560    #[async_trait]
561    impl WorkflowTransport for StubTransport {
562        async fn start_workflow(
563            &self,
564            request: aion_proto::ProtoStartWorkflowRequest,
565        ) -> Result<ProtoStartWorkflowResponse, ClientError> {
566            *self.last_start.lock().await = Some(request);
567            if let Some(error) = self.start_error.lock().await.take() {
568                return Err(error);
569            }
570            Ok(ProtoStartWorkflowResponse {
571                workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
572                run_id: Some(ProtoRunId::from(run_id())),
573            })
574        }
575
576        async fn signal(
577            &self,
578            request: aion_proto::ProtoSignalRequest,
579        ) -> Result<ProtoSignalResponse, ClientError> {
580            *self.last_signal.lock().await = Some(request);
581            if let Some(error) = self.signal_error.lock().await.take() {
582                return Err(error);
583            }
584            Ok(ProtoSignalResponse {})
585        }
586
587        async fn query(
588            &self,
589            request: aion_proto::ProtoQueryRequest,
590        ) -> Result<ProtoQueryResponse, ClientError> {
591            *self.last_query.lock().await = Some(request);
592            if let Some(response) = self.query_response.lock().await.take() {
593                return response;
594            }
595            Ok(ProtoQueryResponse {
596                outcome: Some(proto_query_response::Outcome::Result(
597                    aion_proto::ProtoPayload::from(payload("result")),
598                )),
599            })
600        }
601
602        async fn cancel(
603            &self,
604            request: aion_proto::ProtoCancelRequest,
605        ) -> Result<ProtoCancelResponse, ClientError> {
606            *self.last_cancel.lock().await = Some(request);
607            Ok(ProtoCancelResponse {})
608        }
609
610        async fn list_workflows(
611            &self,
612            request: aion_proto::ProtoListWorkflowsRequest,
613        ) -> Result<ProtoListWorkflowsResponse, ClientError> {
614            *self.last_list.lock().await = Some(request);
615            Ok(ProtoListWorkflowsResponse {
616                summaries: vec![
617                    encode_core_value("tenant-a", None, &visibility_summary())
618                        .map_err(ClientError::from_wire_error)?,
619                ],
620            })
621        }
622
623        async fn describe_workflow(
624            &self,
625            request: aion_proto::ProtoDescribeWorkflowRequest,
626        ) -> Result<ProtoDescribeWorkflowResponse, ClientError> {
627            *self.last_describe.lock().await = Some(request);
628            Ok(ProtoDescribeWorkflowResponse {
629                summary: Some(
630                    encode_workflow_summary("tenant-a", None, &summary())
631                        .map_err(ClientError::from_wire_error)?,
632                ),
633                history: Vec::new(),
634            })
635        }
636
637        async fn subscribe(
638            &self,
639            _: aion_proto::SubscriptionRequest,
640            _: Option<u64>,
641        ) -> Result<SubscriptionAttempt, ClientError> {
642            Ok(SubscriptionAttempt::new(stream::empty().boxed()))
643        }
644    }
645
646    fn client_with(stub: Arc<StubTransport>) -> Client {
647        Client::from_transport(
648            ClientConfig::from(
649                ClientBuilder::new("http://localhost:50051").with_namespace("tenant-a"),
650            ),
651            stub,
652        )
653    }
654
655    fn workflow_id() -> WorkflowId {
656        WorkflowId::new_v4()
657    }
658
659    fn run_id() -> aion_core::RunId {
660        aion_core::RunId::new_v4()
661    }
662
663    fn payload(label: &str) -> Payload {
664        Payload::new(
665            ContentType::Json,
666            format!("{{\"label\":\"{label}\"}}").into_bytes(),
667        )
668    }
669
670    fn empty_payload() -> Payload {
671        Payload::new(ContentType::Json, Vec::new())
672    }
673
674    fn summary() -> aion_core::WorkflowSummary {
675        aion_core::WorkflowSummary {
676            workflow_id: workflow_id(),
677            workflow_type: String::from("checkout"),
678            status: WorkflowStatus::Running,
679            started_at: Utc::now(),
680            ended_at: None,
681            parent: None,
682        }
683    }
684
685    fn visibility_summary() -> aion_store::visibility::WorkflowSummary {
686        aion_store::visibility::WorkflowSummary {
687            workflow_id: workflow_id(),
688            run_id: run_id(),
689            workflow_type: String::from("checkout"),
690            status: WorkflowStatus::Running,
691            start_time: Utc::now(),
692            close_time: None,
693            search_attributes: std::collections::HashMap::new(),
694        }
695    }
696
697    #[tokio::test]
698    async fn start_maps_request_and_returns_handle() -> Result<(), ClientError> {
699        let stub = Arc::new(StubTransport::default());
700        let client = client_with(Arc::clone(&stub));
701
702        let result = client
703            .start("checkout", payload("input"), StartOptions::default())
704            .await?;
705
706        let recorded = stub.last_start.lock().await.clone();
707        assert!(recorded.is_some());
708        let request = recorded.ok_or_else(|| ClientError::server("missing recorded start"))?;
709        assert_eq!(request.namespace, "tenant-a");
710        assert_eq!(request.workflow_type, "checkout");
711        assert!(request.input.is_some());
712        assert_ne!(result.workflow_id(), &WorkflowId::new(uuid::Uuid::nil()));
713        Ok(())
714    }
715
716    #[tokio::test]
717    async fn start_idempotency_replays_identical_and_rejects_conflicts() -> Result<(), ClientError>
718    {
719        let stub = Arc::new(StubTransport::default());
720        let client = client_with(Arc::clone(&stub));
721        let opts = StartOptions {
722            namespace: None,
723            idempotency_key: Some(String::from("retry-key")),
724        };
725
726        let original = client
727            .start("checkout", payload("input"), opts.clone())
728            .await?;
729        let replayed = client
730            .start("checkout", payload("input"), opts.clone())
731            .await?;
732        let conflict = client.start("checkout", payload("other"), opts).await;
733
734        assert_eq!(replayed, original);
735        assert!(
736            matches!(conflict, Err(ClientError::AlreadyExists { .. })),
737            "got {conflict:?}"
738        );
739        Ok(())
740    }
741
742    #[tokio::test]
743    async fn signal_maps_latest_run_and_error() {
744        let stub = Arc::new(StubTransport::default());
745        *stub.signal_error.lock().await = Some(ClientError::not_found("workflow was not found"));
746        let client = client_with(Arc::clone(&stub));
747        let id = workflow_id();
748
749        let result = client.signal(&id, None, "approve", payload("signal")).await;
750
751        assert_eq!(
752            result,
753            Err(ClientError::not_found("workflow was not found"))
754        );
755        let recorded = stub.last_signal.lock().await.clone();
756        assert!(recorded.is_some());
757        let Some(request) = recorded else {
758            return;
759        };
760        assert!(request.run_id.is_none());
761    }
762
763    #[tokio::test]
764    async fn query_maps_result_error_and_deadline() -> Result<(), ClientError> {
765        let stub = Arc::new(StubTransport::default());
766        *stub.query_response.lock().await = Some(Ok(ProtoQueryResponse {
767            outcome: Some(proto_query_response::Outcome::Error(
768                aion_proto::ProtoWireError::from(WireError::query_timeout("slow")),
769            )),
770        }));
771        let client = client_with(Arc::clone(&stub));
772        let id = workflow_id();
773
774        let result = client
775            .query(
776                &id,
777                Some(&run_id()),
778                "state",
779                empty_payload(),
780                Duration::from_secs(1),
781            )
782            .await;
783        let unsupported_args = client
784            .query(&id, None, "state", payload("args"), Duration::from_secs(1))
785            .await;
786
787        assert_eq!(result, Err(ClientError::query_timeout("slow")));
788        assert!(
789            matches!(unsupported_args, Err(ClientError::InvalidArgument { .. })),
790            "got {unsupported_args:?}"
791        );
792        let recorded = stub.last_query.lock().await.clone();
793        assert!(recorded.is_some());
794        let request = recorded.ok_or_else(|| ClientError::server("missing query"))?;
795        assert!(request.run_id.is_some());
796        Ok(())
797    }
798
799    #[tokio::test]
800    async fn query_failed_outcome_error_maps_to_query_failed() -> Result<(), ClientError> {
801        let stub = Arc::new(StubTransport::default());
802        *stub.query_response.lock().await = Some(Ok(ProtoQueryResponse {
803            outcome: Some(proto_query_response::Outcome::Error(
804                aion_proto::ProtoWireError::from(WireError::query_failed("handler raised")),
805            )),
806        }));
807        let client = client_with(Arc::clone(&stub));
808
809        let result = client
810            .query(
811                &workflow_id(),
812                Some(&run_id()),
813                "state",
814                empty_payload(),
815                Duration::from_secs(1),
816            )
817            .await;
818
819        assert_eq!(result, Err(ClientError::query_failed("handler raised")));
820        Ok(())
821    }
822
823    #[tokio::test]
824    async fn backend_outcome_error_is_a_server_fault_not_query_failed() -> Result<(), ClientError> {
825        // `backend` in QueryResponse.error is an unexpected server fault; the
826        // application-level handler failure has its own `query_failed` code.
827        let stub = Arc::new(StubTransport::default());
828        *stub.query_response.lock().await = Some(Ok(ProtoQueryResponse {
829            outcome: Some(proto_query_response::Outcome::Error(
830                aion_proto::ProtoWireError::from(WireError::backend("store down")),
831            )),
832        }));
833        let client = client_with(Arc::clone(&stub));
834
835        let result = client
836            .query(
837                &workflow_id(),
838                Some(&run_id()),
839                "state",
840                empty_payload(),
841                Duration::from_secs(1),
842            )
843            .await;
844
845        assert_eq!(result, Err(ClientError::server("store down")));
846        Ok(())
847    }
848
849    #[tokio::test]
850    async fn query_typed_decodes_no_arg_query_result() -> Result<(), ClientError> {
851        #[derive(serde::Deserialize, PartialEq, Eq, Debug)]
852        struct QueryResult {
853            label: String,
854        }
855
856        let stub = Arc::new(StubTransport::default());
857        let client = client_with(Arc::clone(&stub));
858        let id = workflow_id();
859
860        let result: QueryResult = client
861            .query_typed(&id, Some(&run_id()), "state", &(), Duration::from_secs(1))
862            .await?;
863
864        assert_eq!(
865            result,
866            QueryResult {
867                label: String::from("result")
868            }
869        );
870        assert!(stub.last_query.lock().await.is_some());
871        Ok(())
872    }
873
874    #[tokio::test]
875    async fn query_typed_rejects_non_empty_args_without_silent_drop() {
876        let stub = Arc::new(StubTransport::default());
877        let client = client_with(Arc::clone(&stub));
878        let id = workflow_id();
879
880        let result = client
881            .query_typed::<_, serde_json::Value>(
882                &id,
883                Some(&run_id()),
884                "state",
885                &serde_json::json!({ "filter": "open" }),
886                Duration::from_secs(1),
887            )
888            .await;
889
890        assert!(
891            matches!(result, Err(ClientError::InvalidArgument { .. })),
892            "got {result:?}"
893        );
894        assert!(stub.last_query.lock().await.is_none());
895    }
896
897    #[tokio::test]
898    async fn cancel_list_and_describe_map_requests() -> Result<(), ClientError> {
899        let stub = Arc::new(StubTransport::default());
900        let client = client_with(Arc::clone(&stub));
901        let id = workflow_id();
902        let run = run_id();
903
904        client.cancel(&id, Some(&run), "not needed").await?;
905        let listed = client
906            .list(&WorkflowFilter::default(), ListPage::default())
907            .await?;
908        let described = client.describe(&id, None).await?;
909
910        assert!(stub.last_cancel.lock().await.is_some());
911        assert!(stub.last_list.lock().await.is_some());
912        let describe = stub
913            .last_describe
914            .lock()
915            .await
916            .clone()
917            .ok_or_else(|| ClientError::server("missing describe"))?;
918        assert!(describe.run_id.is_none());
919        assert!(describe.include_history);
920        assert_eq!(listed.len(), 1);
921        assert_eq!(described.history.len(), 0);
922        Ok(())
923    }
924}