1use std::num::NonZeroU64;
4use std::time::Duration;
5
6use aion_core::{Event, Payload, RunId, WorkflowFilter, WorkflowId, WorkflowSummary};
7use aion_proto::{
8 ProtoCancelRequest, ProtoDescribeWorkflowRequest, ProtoListWorkflowsRequest, ProtoPayload,
9 ProtoQueryRequest, ProtoRunId, ProtoSignalRequest, ProtoStartWorkflowRequest, ProtoWorkflowId,
10 WireError, decode_core_value, decode_event, decode_workflow_summary, encode_core_value,
11 proto_query_response,
12};
13use aion_store::visibility::ListWorkflowsFilter;
14
15use serde::Serialize;
16use serde::de::DeserializeOwned;
17
18use crate::client::Client;
19use crate::error::ClientError;
20use crate::handle::WorkflowHandle;
21use crate::payload::{from_payload, to_payload};
22use crate::stream::{EventStream, SubscribeTarget, event_stream, event_stream_from};
23
24#[derive(Clone, Debug, Default, PartialEq, Eq)]
26pub struct StartOptions {
27 pub namespace: Option<String>,
29 pub idempotency_key: Option<String>,
36}
37
38#[derive(Clone, Debug, Default, PartialEq, Eq)]
44pub struct ListPage {
45 pub request_id: Option<String>,
47 pub limit: Option<usize>,
49 pub cursor: Option<String>,
51}
52
53#[derive(Clone, Debug, PartialEq)]
55pub struct WorkflowDescription {
56 pub summary: WorkflowSummary,
58 pub history: Vec<Event>,
60}
61
62impl Client {
63 pub async fn start(
69 &self,
70 workflow_type: impl Into<String>,
71 input: Payload,
72 opts: StartOptions,
73 ) -> Result<WorkflowHandle, ClientError> {
74 validate_start_options(&opts)?;
75 let idempotency_key = opts.idempotency_key.clone();
76 let namespace = operation_namespace(self, opts.namespace);
77 let workflow_type = workflow_type.into();
78 let fingerprint = idempotency_key.as_ref().map(|key| {
79 StartFingerprint::new(
80 namespace.clone(),
81 workflow_type.clone(),
82 &input,
83 key.clone(),
84 )
85 });
86 if let Some(fingerprint) = &fingerprint {
87 if let Some(handle) = self.cached_start(fingerprint).await? {
88 return Ok(handle);
89 }
90 }
91 let response = self
92 .transport
93 .start_workflow(ProtoStartWorkflowRequest {
94 namespace,
95 workflow_type,
96 input: Some(ProtoPayload::from(input)),
97 })
98 .await?;
99 let workflow_id = decode_required_workflow_id(response.workflow_id, "start response")?;
100 let run_id = decode_required_run_id(response.run_id, "start response")?;
101 let handle = WorkflowHandle::from_ids(self.clone(), workflow_id, run_id);
102 if let Some(fingerprint) = fingerprint {
103 self.record_start(fingerprint, handle.clone()).await?;
104 }
105 Ok(handle)
106 }
107
108 pub async fn start_typed<T>(
115 &self,
116 workflow_type: impl Into<String>,
117 input: &T,
118 opts: StartOptions,
119 ) -> Result<WorkflowHandle, ClientError>
120 where
121 T: Serialize + ?Sized,
122 {
123 self.start(workflow_type, to_payload(input)?, opts).await
124 }
125
126 pub async fn signal(
132 &self,
133 workflow_id: &WorkflowId,
134 run_id: Option<&RunId>,
135 name: impl Into<String>,
136 payload: Payload,
137 ) -> Result<(), ClientError> {
138 self.transport
139 .signal(ProtoSignalRequest {
140 namespace: self.namespace().to_owned(),
141 workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
142 run_id: run_id.cloned().map(ProtoRunId::from),
143 signal_name: name.into(),
144 payload: Some(ProtoPayload::from(payload)),
145 })
146 .await?;
147 Ok(())
148 }
149
150 pub async fn signal_typed<T>(
157 &self,
158 workflow_id: &WorkflowId,
159 run_id: Option<&RunId>,
160 name: impl Into<String>,
161 value: &T,
162 ) -> Result<(), ClientError>
163 where
164 T: Serialize + ?Sized,
165 {
166 self.signal(workflow_id, run_id, name, to_payload(value)?)
167 .await
168 }
169
170 pub async fn query(
180 &self,
181 workflow_id: &WorkflowId,
182 run_id: Option<&RunId>,
183 name: impl Into<String>,
184 args: Payload,
185 deadline: Duration,
186 ) -> Result<Payload, ClientError> {
187 validate_query_args(&args)?;
188 let response = tokio::time::timeout(
189 deadline,
190 self.transport.query(ProtoQueryRequest {
191 namespace: self.namespace().to_owned(),
192 workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
193 run_id: run_id.cloned().map(ProtoRunId::from),
194 query_name: name.into(),
195 }),
196 )
197 .await
198 .map_err(|_| {
199 ClientError::query_timeout(format!(
200 "query deadline of {deadline:?} elapsed before the server replied"
201 ))
202 })??;
203
204 match response.outcome {
205 Some(proto_query_response::Outcome::Result(payload)) => {
206 Payload::try_from(payload).map_err(ClientError::from_wire_error)
207 }
208 Some(proto_query_response::Outcome::Error(error)) => Err(query_error(error)),
209 None => Err(ClientError::server("query response outcome is missing")),
210 }
211 }
212
213 pub async fn query_typed<A, R>(
220 &self,
221 workflow_id: &WorkflowId,
222 run_id: Option<&RunId>,
223 name: impl Into<String>,
224 args: &A,
225 deadline: Duration,
226 ) -> Result<R, ClientError>
227 where
228 A: Serialize + ?Sized,
229 R: DeserializeOwned,
230 {
231 let payload = self
232 .query(
233 workflow_id,
234 run_id,
235 name,
236 query_args_payload(args)?,
237 deadline,
238 )
239 .await?;
240 from_payload(&payload)
241 }
242
243 pub async fn cancel(
252 &self,
253 workflow_id: &WorkflowId,
254 run_id: Option<&RunId>,
255 reason: impl Into<String>,
256 ) -> Result<(), ClientError> {
257 self.transport
258 .cancel(ProtoCancelRequest {
259 namespace: self.namespace().to_owned(),
260 workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
261 run_id: run_id.cloned().map(ProtoRunId::from),
262 reason: reason.into(),
263 })
264 .await?;
265 Ok(())
266 }
267
268 pub async fn list(
274 &self,
275 filter: &WorkflowFilter,
276 page: ListPage,
277 ) -> Result<Vec<WorkflowSummary>, ClientError> {
278 validate_list_page(&page)?;
279 let namespace = self.namespace().to_owned();
280 let filter = workflow_filter_to_visibility(filter)?;
281 let filter = encode_core_value(namespace.clone(), page.request_id, &filter)
282 .map_err(ClientError::from_wire_error)?;
283 let response = self
284 .transport
285 .list_workflows(ProtoListWorkflowsRequest {
286 namespace,
287 filter: Some(filter),
288 })
289 .await?;
290
291 response
292 .summaries
293 .iter()
294 .map(decode_visibility_summary)
295 .map(|result| result.map_err(ClientError::from_wire_error))
296 .collect()
297 }
298
299 pub async fn describe(
305 &self,
306 workflow_id: &WorkflowId,
307 run_id: Option<&RunId>,
308 ) -> Result<WorkflowDescription, ClientError> {
309 let response = self
310 .transport
311 .describe_workflow(ProtoDescribeWorkflowRequest {
312 namespace: self.namespace().to_owned(),
313 workflow_id: Some(ProtoWorkflowId::from(workflow_id.clone())),
314 run_id: run_id.cloned().map(ProtoRunId::from),
315 include_history: true,
316 })
317 .await?;
318 let summary = response
319 .summary
320 .as_ref()
321 .ok_or_else(|| ClientError::server("describe response summary is missing"))
322 .and_then(|summary| {
323 decode_workflow_summary(summary).map_err(ClientError::from_wire_error)
324 })?;
325 let history = response
326 .history
327 .iter()
328 .map(decode_event)
329 .map(|result| result.map_err(ClientError::from_wire_error))
330 .collect::<Result<Vec<_>, _>>()?;
331 Ok(WorkflowDescription { summary, history })
332 }
333
334 #[must_use]
336 pub fn subscribe_workflow(&self, workflow_id: &WorkflowId) -> EventStream {
337 event_stream(
338 self.transport.clone(),
339 self.namespace().to_owned(),
340 SubscribeTarget::Workflow {
341 workflow_id: workflow_id.clone(),
342 },
343 )
344 }
345
346 #[must_use]
353 pub fn subscribe_workflow_from(
354 &self,
355 workflow_id: &WorkflowId,
356 resume_from: NonZeroU64,
357 ) -> EventStream {
358 event_stream_from(
359 self.transport.clone(),
360 self.namespace().to_owned(),
361 workflow_id.clone(),
362 resume_from,
363 )
364 }
365
366 #[must_use]
368 pub fn subscribe(&self, filter: WorkflowFilter) -> EventStream {
369 event_stream(
370 self.transport.clone(),
371 self.namespace().to_owned(),
372 SubscribeTarget::Filtered { filter },
373 )
374 }
375
376 #[must_use]
378 pub fn subscribe_firehose(&self) -> EventStream {
379 event_stream(
380 self.transport.clone(),
381 self.namespace().to_owned(),
382 SubscribeTarget::Firehose,
383 )
384 }
385}
386
387#[derive(Clone, Debug, PartialEq, Eq)]
388pub(crate) struct StartFingerprint {
389 namespace: String,
390 workflow_type: String,
391 content_type: aion_core::ContentType,
392 bytes: Vec<u8>,
393 idempotency_key: String,
394}
395
396impl StartFingerprint {
397 fn new(
398 namespace: String,
399 workflow_type: String,
400 input: &Payload,
401 idempotency_key: String,
402 ) -> Self {
403 Self {
404 namespace,
405 workflow_type,
406 content_type: input.content_type().clone(),
407 bytes: input.bytes().to_vec(),
408 idempotency_key,
409 }
410 }
411
412 pub(crate) fn key(&self) -> &str {
413 &self.idempotency_key
414 }
415}
416
417fn operation_namespace(client: &Client, namespace: Option<String>) -> String {
418 namespace.unwrap_or_else(|| client.namespace().to_owned())
419}
420
421fn validate_start_options(opts: &StartOptions) -> Result<(), ClientError> {
422 if opts
423 .idempotency_key
424 .as_ref()
425 .is_some_and(std::string::String::is_empty)
426 {
427 return Err(ClientError::invalid_argument(
428 "idempotency_key must not be empty",
429 ));
430 }
431 Ok(())
432}
433
434fn validate_query_args(args: &Payload) -> Result<(), ClientError> {
435 if !args.bytes().is_empty() {
436 return Err(ClientError::invalid_argument(
437 "query arguments are not carried by the current wire contract; \
438 pass an empty payload",
439 ));
440 }
441 Ok(())
442}
443
444fn query_args_payload<T>(args: &T) -> Result<Payload, ClientError>
445where
446 T: Serialize + ?Sized,
447{
448 let payload = to_payload(args)?;
449 if payload.bytes() == b"null" {
450 Ok(Payload::new(payload.content_type().clone(), Vec::new()))
451 } else {
452 Ok(payload)
453 }
454}
455
456fn validate_list_page(page: &ListPage) -> Result<(), ClientError> {
457 if page.limit.is_some() || page.cursor.is_some() {
458 return Err(ClientError::invalid_argument(
459 "list pagination limit/cursor are reserved by the contract and \
460 not yet carried by the wire",
461 ));
462 }
463 Ok(())
464}
465
466fn workflow_filter_to_visibility(
467 filter: &WorkflowFilter,
468) -> Result<ListWorkflowsFilter, ClientError> {
469 if filter.parent.is_some() {
470 return Err(ClientError::invalid_argument(
471 "parent workflow filters are not carried by the visibility wire contract",
472 ));
473 }
474
475 Ok(ListWorkflowsFilter {
476 workflow_type: filter.workflow_type.clone(),
477 status: filter.status,
478 started_after: filter.started_after,
479 started_before: filter.started_before,
480 ..ListWorkflowsFilter::default()
481 })
482}
483
484fn decode_visibility_summary(
485 envelope: &aion_proto::WireEnvelope,
486) -> Result<WorkflowSummary, WireError> {
487 let summary = decode_core_value::<aion_store::visibility::WorkflowSummary>(envelope)?;
488 Ok(WorkflowSummary {
489 workflow_id: summary.workflow_id,
490 workflow_type: summary.workflow_type,
491 status: summary.status,
492 started_at: summary.start_time,
493 ended_at: summary.close_time,
494 parent: None,
495 })
496}
497
498fn decode_required_workflow_id(
499 value: Option<ProtoWorkflowId>,
500 context: &str,
501) -> Result<WorkflowId, ClientError> {
502 value
503 .ok_or_else(|| ClientError::server(format!("{context} workflow id is missing")))?
504 .try_into()
505 .map_err(ClientError::from_wire_error)
506}
507
508fn decode_required_run_id(value: Option<ProtoRunId>, context: &str) -> Result<RunId, ClientError> {
509 value
510 .ok_or_else(|| ClientError::server(format!("{context} run id is missing")))?
511 .try_into()
512 .map_err(ClientError::from_wire_error)
513}
514
515fn query_error(error: aion_proto::ProtoWireError) -> ClientError {
521 ClientError::from_proto_wire_error(error)
522}
523
524#[cfg(test)]
525mod tests {
526 use std::sync::Arc;
527 use std::time::Duration;
528
529 use aion_core::{ContentType, Payload, WorkflowFilter, WorkflowId, WorkflowStatus};
530 use aion_proto::{
531 ProtoCancelResponse, ProtoDescribeWorkflowResponse, ProtoListWorkflowsResponse,
532 ProtoQueryResponse, ProtoRunId, ProtoSignalResponse, ProtoStartWorkflowResponse,
533 ProtoWorkflowId, WireError, encode_core_value, encode_workflow_summary,
534 proto_query_response,
535 };
536 use async_trait::async_trait;
537 use chrono::Utc;
538 use futures::StreamExt;
539 use futures::stream;
540 use tokio::sync::Mutex;
541
542 use super::{ListPage, StartOptions};
543 use crate::client::{Client, ClientBuilder, ClientConfig};
544 use crate::error::ClientError;
545 use crate::transport::{SubscriptionAttempt, WorkflowTransport};
546
547 #[derive(Default)]
548 struct StubTransport {
549 last_start: Mutex<Option<aion_proto::ProtoStartWorkflowRequest>>,
550 last_signal: Mutex<Option<aion_proto::ProtoSignalRequest>>,
551 last_query: Mutex<Option<aion_proto::ProtoQueryRequest>>,
552 last_cancel: Mutex<Option<aion_proto::ProtoCancelRequest>>,
553 last_list: Mutex<Option<aion_proto::ProtoListWorkflowsRequest>>,
554 last_describe: Mutex<Option<aion_proto::ProtoDescribeWorkflowRequest>>,
555 start_error: Mutex<Option<ClientError>>,
556 signal_error: Mutex<Option<ClientError>>,
557 query_response: Mutex<Option<Result<ProtoQueryResponse, ClientError>>>,
558 }
559
560 #[async_trait]
561 impl WorkflowTransport for StubTransport {
562 async fn start_workflow(
563 &self,
564 request: aion_proto::ProtoStartWorkflowRequest,
565 ) -> Result<ProtoStartWorkflowResponse, ClientError> {
566 *self.last_start.lock().await = Some(request);
567 if let Some(error) = self.start_error.lock().await.take() {
568 return Err(error);
569 }
570 Ok(ProtoStartWorkflowResponse {
571 workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
572 run_id: Some(ProtoRunId::from(run_id())),
573 })
574 }
575
576 async fn signal(
577 &self,
578 request: aion_proto::ProtoSignalRequest,
579 ) -> Result<ProtoSignalResponse, ClientError> {
580 *self.last_signal.lock().await = Some(request);
581 if let Some(error) = self.signal_error.lock().await.take() {
582 return Err(error);
583 }
584 Ok(ProtoSignalResponse {})
585 }
586
587 async fn query(
588 &self,
589 request: aion_proto::ProtoQueryRequest,
590 ) -> Result<ProtoQueryResponse, ClientError> {
591 *self.last_query.lock().await = Some(request);
592 if let Some(response) = self.query_response.lock().await.take() {
593 return response;
594 }
595 Ok(ProtoQueryResponse {
596 outcome: Some(proto_query_response::Outcome::Result(
597 aion_proto::ProtoPayload::from(payload("result")),
598 )),
599 })
600 }
601
602 async fn cancel(
603 &self,
604 request: aion_proto::ProtoCancelRequest,
605 ) -> Result<ProtoCancelResponse, ClientError> {
606 *self.last_cancel.lock().await = Some(request);
607 Ok(ProtoCancelResponse {})
608 }
609
610 async fn list_workflows(
611 &self,
612 request: aion_proto::ProtoListWorkflowsRequest,
613 ) -> Result<ProtoListWorkflowsResponse, ClientError> {
614 *self.last_list.lock().await = Some(request);
615 Ok(ProtoListWorkflowsResponse {
616 summaries: vec![
617 encode_core_value("tenant-a", None, &visibility_summary())
618 .map_err(ClientError::from_wire_error)?,
619 ],
620 })
621 }
622
623 async fn describe_workflow(
624 &self,
625 request: aion_proto::ProtoDescribeWorkflowRequest,
626 ) -> Result<ProtoDescribeWorkflowResponse, ClientError> {
627 *self.last_describe.lock().await = Some(request);
628 Ok(ProtoDescribeWorkflowResponse {
629 summary: Some(
630 encode_workflow_summary("tenant-a", None, &summary())
631 .map_err(ClientError::from_wire_error)?,
632 ),
633 history: Vec::new(),
634 })
635 }
636
637 async fn subscribe(
638 &self,
639 _: aion_proto::SubscriptionRequest,
640 _: Option<u64>,
641 ) -> Result<SubscriptionAttempt, ClientError> {
642 Ok(SubscriptionAttempt::new(stream::empty().boxed()))
643 }
644 }
645
646 fn client_with(stub: Arc<StubTransport>) -> Client {
647 Client::from_transport(
648 ClientConfig::from(
649 ClientBuilder::new("http://localhost:50051").with_namespace("tenant-a"),
650 ),
651 stub,
652 )
653 }
654
655 fn workflow_id() -> WorkflowId {
656 WorkflowId::new_v4()
657 }
658
659 fn run_id() -> aion_core::RunId {
660 aion_core::RunId::new_v4()
661 }
662
663 fn payload(label: &str) -> Payload {
664 Payload::new(
665 ContentType::Json,
666 format!("{{\"label\":\"{label}\"}}").into_bytes(),
667 )
668 }
669
670 fn empty_payload() -> Payload {
671 Payload::new(ContentType::Json, Vec::new())
672 }
673
674 fn summary() -> aion_core::WorkflowSummary {
675 aion_core::WorkflowSummary {
676 workflow_id: workflow_id(),
677 workflow_type: String::from("checkout"),
678 status: WorkflowStatus::Running,
679 started_at: Utc::now(),
680 ended_at: None,
681 parent: None,
682 }
683 }
684
685 fn visibility_summary() -> aion_store::visibility::WorkflowSummary {
686 aion_store::visibility::WorkflowSummary {
687 workflow_id: workflow_id(),
688 run_id: run_id(),
689 workflow_type: String::from("checkout"),
690 status: WorkflowStatus::Running,
691 start_time: Utc::now(),
692 close_time: None,
693 search_attributes: std::collections::HashMap::new(),
694 }
695 }
696
697 #[tokio::test]
698 async fn start_maps_request_and_returns_handle() -> Result<(), ClientError> {
699 let stub = Arc::new(StubTransport::default());
700 let client = client_with(Arc::clone(&stub));
701
702 let result = client
703 .start("checkout", payload("input"), StartOptions::default())
704 .await?;
705
706 let recorded = stub.last_start.lock().await.clone();
707 assert!(recorded.is_some());
708 let request = recorded.ok_or_else(|| ClientError::server("missing recorded start"))?;
709 assert_eq!(request.namespace, "tenant-a");
710 assert_eq!(request.workflow_type, "checkout");
711 assert!(request.input.is_some());
712 assert_ne!(result.workflow_id(), &WorkflowId::new(uuid::Uuid::nil()));
713 Ok(())
714 }
715
716 #[tokio::test]
717 async fn start_idempotency_replays_identical_and_rejects_conflicts() -> Result<(), ClientError>
718 {
719 let stub = Arc::new(StubTransport::default());
720 let client = client_with(Arc::clone(&stub));
721 let opts = StartOptions {
722 namespace: None,
723 idempotency_key: Some(String::from("retry-key")),
724 };
725
726 let original = client
727 .start("checkout", payload("input"), opts.clone())
728 .await?;
729 let replayed = client
730 .start("checkout", payload("input"), opts.clone())
731 .await?;
732 let conflict = client.start("checkout", payload("other"), opts).await;
733
734 assert_eq!(replayed, original);
735 assert!(
736 matches!(conflict, Err(ClientError::AlreadyExists { .. })),
737 "got {conflict:?}"
738 );
739 Ok(())
740 }
741
742 #[tokio::test]
743 async fn signal_maps_latest_run_and_error() {
744 let stub = Arc::new(StubTransport::default());
745 *stub.signal_error.lock().await = Some(ClientError::not_found("workflow was not found"));
746 let client = client_with(Arc::clone(&stub));
747 let id = workflow_id();
748
749 let result = client.signal(&id, None, "approve", payload("signal")).await;
750
751 assert_eq!(
752 result,
753 Err(ClientError::not_found("workflow was not found"))
754 );
755 let recorded = stub.last_signal.lock().await.clone();
756 assert!(recorded.is_some());
757 let Some(request) = recorded else {
758 return;
759 };
760 assert!(request.run_id.is_none());
761 }
762
763 #[tokio::test]
764 async fn query_maps_result_error_and_deadline() -> Result<(), ClientError> {
765 let stub = Arc::new(StubTransport::default());
766 *stub.query_response.lock().await = Some(Ok(ProtoQueryResponse {
767 outcome: Some(proto_query_response::Outcome::Error(
768 aion_proto::ProtoWireError::from(WireError::query_timeout("slow")),
769 )),
770 }));
771 let client = client_with(Arc::clone(&stub));
772 let id = workflow_id();
773
774 let result = client
775 .query(
776 &id,
777 Some(&run_id()),
778 "state",
779 empty_payload(),
780 Duration::from_secs(1),
781 )
782 .await;
783 let unsupported_args = client
784 .query(&id, None, "state", payload("args"), Duration::from_secs(1))
785 .await;
786
787 assert_eq!(result, Err(ClientError::query_timeout("slow")));
788 assert!(
789 matches!(unsupported_args, Err(ClientError::InvalidArgument { .. })),
790 "got {unsupported_args:?}"
791 );
792 let recorded = stub.last_query.lock().await.clone();
793 assert!(recorded.is_some());
794 let request = recorded.ok_or_else(|| ClientError::server("missing query"))?;
795 assert!(request.run_id.is_some());
796 Ok(())
797 }
798
799 #[tokio::test]
800 async fn query_failed_outcome_error_maps_to_query_failed() -> Result<(), ClientError> {
801 let stub = Arc::new(StubTransport::default());
802 *stub.query_response.lock().await = Some(Ok(ProtoQueryResponse {
803 outcome: Some(proto_query_response::Outcome::Error(
804 aion_proto::ProtoWireError::from(WireError::query_failed("handler raised")),
805 )),
806 }));
807 let client = client_with(Arc::clone(&stub));
808
809 let result = client
810 .query(
811 &workflow_id(),
812 Some(&run_id()),
813 "state",
814 empty_payload(),
815 Duration::from_secs(1),
816 )
817 .await;
818
819 assert_eq!(result, Err(ClientError::query_failed("handler raised")));
820 Ok(())
821 }
822
823 #[tokio::test]
824 async fn backend_outcome_error_is_a_server_fault_not_query_failed() -> Result<(), ClientError> {
825 let stub = Arc::new(StubTransport::default());
828 *stub.query_response.lock().await = Some(Ok(ProtoQueryResponse {
829 outcome: Some(proto_query_response::Outcome::Error(
830 aion_proto::ProtoWireError::from(WireError::backend("store down")),
831 )),
832 }));
833 let client = client_with(Arc::clone(&stub));
834
835 let result = client
836 .query(
837 &workflow_id(),
838 Some(&run_id()),
839 "state",
840 empty_payload(),
841 Duration::from_secs(1),
842 )
843 .await;
844
845 assert_eq!(result, Err(ClientError::server("store down")));
846 Ok(())
847 }
848
849 #[tokio::test]
850 async fn query_typed_decodes_no_arg_query_result() -> Result<(), ClientError> {
851 #[derive(serde::Deserialize, PartialEq, Eq, Debug)]
852 struct QueryResult {
853 label: String,
854 }
855
856 let stub = Arc::new(StubTransport::default());
857 let client = client_with(Arc::clone(&stub));
858 let id = workflow_id();
859
860 let result: QueryResult = client
861 .query_typed(&id, Some(&run_id()), "state", &(), Duration::from_secs(1))
862 .await?;
863
864 assert_eq!(
865 result,
866 QueryResult {
867 label: String::from("result")
868 }
869 );
870 assert!(stub.last_query.lock().await.is_some());
871 Ok(())
872 }
873
874 #[tokio::test]
875 async fn query_typed_rejects_non_empty_args_without_silent_drop() {
876 let stub = Arc::new(StubTransport::default());
877 let client = client_with(Arc::clone(&stub));
878 let id = workflow_id();
879
880 let result = client
881 .query_typed::<_, serde_json::Value>(
882 &id,
883 Some(&run_id()),
884 "state",
885 &serde_json::json!({ "filter": "open" }),
886 Duration::from_secs(1),
887 )
888 .await;
889
890 assert!(
891 matches!(result, Err(ClientError::InvalidArgument { .. })),
892 "got {result:?}"
893 );
894 assert!(stub.last_query.lock().await.is_none());
895 }
896
897 #[tokio::test]
898 async fn cancel_list_and_describe_map_requests() -> Result<(), ClientError> {
899 let stub = Arc::new(StubTransport::default());
900 let client = client_with(Arc::clone(&stub));
901 let id = workflow_id();
902 let run = run_id();
903
904 client.cancel(&id, Some(&run), "not needed").await?;
905 let listed = client
906 .list(&WorkflowFilter::default(), ListPage::default())
907 .await?;
908 let described = client.describe(&id, None).await?;
909
910 assert!(stub.last_cancel.lock().await.is_some());
911 assert!(stub.last_list.lock().await.is_some());
912 let describe = stub
913 .last_describe
914 .lock()
915 .await
916 .clone()
917 .ok_or_else(|| ClientError::server("missing describe"))?;
918 assert!(describe.run_id.is_none());
919 assert!(describe.include_history);
920 assert_eq!(listed.len(), 1);
921 assert_eq!(described.history.len(), 0);
922 Ok(())
923 }
924}