1use crate::convert::{parse_timestamp, uuid_to_proto};
4use crate::error::{ClientError, Result};
5use crate::proto::{
6 page_header::SequenceType, query::Selection, temporal_query::PointInTime, CommandBook,
7 CommandPage, CommandResponse, Cover, Edition, EventBook, EventPage, PageHeader, Query,
8 SequenceRange, TemporalQuery,
9};
10use crate::traits;
11use prost::Message;
12use uuid::Uuid;
13
14pub struct CommandBuilder<'a, C: traits::GatewayClient> {
16 client: &'a C,
17 domain: String,
18 root: Uuid,
19 correlation_id: Option<String>,
20 sequence: Option<u32>,
21 merge_strategy: crate::proto::MergeStrategy,
22 type_url: Option<String>,
23 payload: Option<Vec<u8>>,
24}
25
26impl<'a, C: traits::GatewayClient> CommandBuilder<'a, C> {
27 pub(crate) fn new(client: &'a C, domain: impl Into<String>, root: Uuid) -> Self {
28 Self {
29 client,
30 domain: domain.into(),
31 root,
32 correlation_id: None,
33 sequence: None,
34 merge_strategy: crate::proto::MergeStrategy::MergeCommutative,
35 type_url: None,
36 payload: None,
37 }
38 }
39
40 pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
43 self.correlation_id = Some(id.into());
44 self
45 }
46
47 pub fn with_sequence(mut self, seq: u32) -> Self {
50 self.sequence = Some(seq);
51 self
52 }
53
54 pub fn with_merge_strategy(mut self, strategy: crate::proto::MergeStrategy) -> Self {
57 self.merge_strategy = strategy;
58 self
59 }
60
61 pub fn with_command<M: Message>(mut self, type_url: impl Into<String>, message: &M) -> Self {
63 self.type_url = Some(type_url.into());
64 self.payload = Some(message.encode_to_vec());
65 self
66 }
67
68 pub fn build(self) -> Result<CommandBook> {
70 let type_url = self.type_url.ok_or_else(|| ClientError::InvalidArgument {
71 msg: "command type_url not set".to_string(),
72 })?;
73 let payload = self.payload.ok_or_else(|| ClientError::InvalidArgument {
74 msg: "command payload not set".to_string(),
75 })?;
76 let sequence = self.sequence.ok_or_else(|| ClientError::InvalidArgument {
77 msg: "command sequence not set".to_string(),
78 })?;
79 let correlation_id = self
80 .correlation_id
81 .unwrap_or_else(|| Uuid::new_v4().to_string());
82
83 Ok(CommandBook {
84 cover: Some(Cover {
85 domain: self.domain,
86 root: Some(uuid_to_proto(self.root)),
87 correlation_id,
88 edition: None,
89 }),
90 pages: vec![CommandPage {
91 header: Some(PageHeader {
92 sequence_type: Some(SequenceType::Sequence(sequence)),
93 }),
94 merge_strategy: self.merge_strategy as i32,
95 payload: Some(crate::proto::command_page::Payload::Command(
96 prost_types::Any {
97 type_url,
98 value: payload,
99 },
100 )),
101 }],
102 })
103 }
104
105 pub async fn execute(self) -> Result<CommandResponse> {
107 let client = self.client;
108 let command = self.build()?;
109 client.execute(command).await
110 }
111}
112
113pub struct QueryBuilder<'a, C: traits::QueryClient> {
115 client: &'a C,
116 domain: String,
117 root: Option<Uuid>,
118 correlation_id: Option<String>,
119 selection: Option<Selection>,
120 edition: Option<String>,
121}
122
123impl<'a, C: traits::QueryClient> QueryBuilder<'a, C> {
124 pub(crate) fn new(client: &'a C, domain: impl Into<String>, root: Option<Uuid>) -> Self {
125 Self {
126 client,
127 domain: domain.into(),
128 root,
129 correlation_id: None,
130 selection: None,
131 edition: None,
132 }
133 }
134
135 pub fn by_correlation_id(mut self, id: impl Into<String>) -> Self {
137 self.correlation_id = Some(id.into());
138 self.root = None;
139 self
140 }
141
142 pub fn edition(mut self, edition: impl Into<String>) -> Self {
144 self.edition = Some(edition.into());
145 self
146 }
147
148 pub fn range(mut self, lower: u32) -> Self {
150 self.selection = Some(Selection::Range(SequenceRange { lower, upper: None }));
151 self
152 }
153
154 pub fn range_to(mut self, lower: u32, upper: u32) -> Self {
156 self.selection = Some(Selection::Range(SequenceRange {
157 lower,
158 upper: Some(upper),
159 }));
160 self
161 }
162
163 pub fn as_of_sequence(mut self, seq: u32) -> Self {
165 self.selection = Some(Selection::Temporal(TemporalQuery {
166 point_in_time: Some(PointInTime::AsOfSequence(seq)),
167 }));
168 self
169 }
170
171 pub fn as_of_time(mut self, rfc3339: &str) -> Result<Self> {
173 let timestamp = parse_timestamp(rfc3339)?;
174 self.selection = Some(Selection::Temporal(TemporalQuery {
175 point_in_time: Some(PointInTime::AsOfTime(timestamp)),
176 }));
177 Ok(self)
178 }
179
180 pub fn build(self) -> Query {
182 self.build_inner()
183 }
184
185 fn build_inner(&self) -> Query {
186 Query {
187 cover: Some(Cover {
188 domain: self.domain.clone(),
189 root: self.root.map(uuid_to_proto),
190 correlation_id: self.correlation_id.clone().unwrap_or_default(),
191 edition: self.edition.clone().map(Edition::from),
192 }),
193 selection: self.selection.clone(),
194 }
195 }
196
197 pub async fn get_events(self) -> Result<EventBook> {
199 let client = self.client;
200 let query = self.build_inner();
201 client.get_events(query).await
202 }
203
204 pub async fn get_pages(self) -> Result<Vec<EventPage>> {
206 let client = self.client;
207 let query = self.build_inner();
208 let event_book = client.get_events(query).await?;
209 Ok(event_book.pages)
210 }
211}
212
213pub trait CommandBuilderExt: traits::GatewayClient + Sized {
215 fn command(&self, domain: impl Into<String>, root: Uuid) -> CommandBuilder<'_, Self> {
217 CommandBuilder::new(self, domain, root)
218 }
219}
220
221impl<T: traits::GatewayClient> CommandBuilderExt for T {}
222
223pub trait QueryBuilderExt: traits::QueryClient + Sized {
225 fn query(&self, domain: impl Into<String>, root: Uuid) -> QueryBuilder<'_, Self> {
227 QueryBuilder::new(self, domain, Some(root))
228 }
229
230 fn query_domain(&self, domain: impl Into<String>) -> QueryBuilder<'_, Self> {
232 QueryBuilder::new(self, domain, None)
233 }
234}
235
236impl<T: traits::QueryClient> QueryBuilderExt for T {}
237
238pub fn root_from_cover(cover: &Cover) -> Option<Uuid> {
240 cover
241 .root
242 .as_ref()
243 .and_then(|r| Uuid::from_slice(&r.value).ok())
244}
245
246pub fn events_from_response(response: &CommandResponse) -> &[EventPage] {
248 response
249 .events
250 .as_ref()
251 .map(|e| e.pages.as_slice())
252 .unwrap_or(&[])
253}
254
255pub fn decode_event<M: Message + Default>(event: &EventPage, type_suffix: &str) -> Option<M> {
257 let any = match &event.payload {
258 Some(crate::proto::event_page::Payload::Event(e)) => e,
259 _ => return None,
260 };
261 if !any.type_url.ends_with(type_suffix) {
262 return None;
263 }
264 M::decode(any.value.as_slice()).ok()
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use crate::proto::{Cover, Uuid as ProtoUuid};
271 use async_trait::async_trait;
272
273 struct MockQueryClient {
275 event_book: EventBook,
276 }
277
278 #[async_trait]
279 impl traits::QueryClient for MockQueryClient {
280 async fn get_events(&self, _query: Query) -> Result<EventBook> {
281 Ok(self.event_book.clone())
282 }
283 }
284
285 struct MockGatewayClient {
287 response: CommandResponse,
288 }
289
290 #[async_trait]
291 impl traits::GatewayClient for MockGatewayClient {
292 async fn execute(&self, _command: CommandBook) -> Result<CommandResponse> {
293 Ok(self.response.clone())
294 }
295 }
296
297 fn make_cover(domain: &str, correlation_id: &str, root: Option<Uuid>) -> Cover {
298 Cover {
299 domain: domain.to_string(),
300 correlation_id: correlation_id.to_string(),
301 root: root.map(|u| ProtoUuid {
302 value: u.as_bytes().to_vec(),
303 }),
304 edition: None,
305 }
306 }
307
308 #[test]
310 fn test_command_builder_with_correlation_id() {
311 let client = MockGatewayClient {
312 response: CommandResponse::default(),
313 };
314 let root = Uuid::new_v4();
315 let builder = CommandBuilder::new(&client, "orders", root).with_correlation_id("corr-123");
316
317 assert_eq!(builder.correlation_id, Some("corr-123".to_string()));
318 }
319
320 #[test]
321 fn test_command_builder_with_sequence() {
322 let client = MockGatewayClient {
323 response: CommandResponse::default(),
324 };
325 let root = Uuid::new_v4();
326 let builder = CommandBuilder::new(&client, "orders", root).with_sequence(42);
327
328 assert_eq!(builder.sequence, Some(42));
329 }
330
331 #[test]
332 fn test_command_builder_with_command() {
333 let client = MockGatewayClient {
334 response: CommandResponse::default(),
335 };
336 let root = Uuid::new_v4();
337 let msg = prost_types::Duration {
338 seconds: 42,
339 nanos: 0,
340 };
341 let builder = CommandBuilder::new(&client, "orders", root)
342 .with_command("type.googleapis.com/test.Command", &msg);
343
344 assert_eq!(
345 builder.type_url,
346 Some("type.googleapis.com/test.Command".to_string())
347 );
348 assert!(builder.payload.is_some());
349 }
350
351 #[test]
352 fn test_command_builder_build_success() {
353 let client = MockGatewayClient {
354 response: CommandResponse::default(),
355 };
356 let root = Uuid::new_v4();
357 let msg = prost_types::Duration {
358 seconds: 42,
359 nanos: 0,
360 };
361 let cmd = CommandBuilder::new(&client, "orders", root)
362 .with_correlation_id("corr-123")
363 .with_sequence(5)
364 .with_command("type.googleapis.com/test.Command", &msg)
365 .build()
366 .unwrap();
367
368 let cover = cmd.cover.unwrap();
369 assert_eq!(cover.domain, "orders");
370 assert_eq!(cover.correlation_id, "corr-123");
371 assert!(cover.root.is_some());
372 assert_eq!(cmd.pages.len(), 1);
373 let header = cmd.pages[0].header.as_ref().unwrap();
375 match &header.sequence_type {
376 Some(SequenceType::Sequence(seq)) => assert_eq!(*seq, 5),
377 _ => panic!("expected explicit sequence"),
378 }
379 }
380
381 #[test]
382 fn test_command_builder_build_generates_correlation_id() {
383 let client = MockGatewayClient {
384 response: CommandResponse::default(),
385 };
386 let root = Uuid::new_v4();
387 let msg = prost_types::Duration {
388 seconds: 42,
389 nanos: 0,
390 };
391 let cmd = CommandBuilder::new(&client, "orders", root)
392 .with_sequence(0)
393 .with_command("type.googleapis.com/test.Command", &msg)
394 .build()
395 .unwrap();
396
397 let cover = cmd.cover.unwrap();
398 assert!(!cover.correlation_id.is_empty());
399 }
400
401 #[test]
402 fn test_command_builder_build_missing_type_url() {
403 let client = MockGatewayClient {
404 response: CommandResponse::default(),
405 };
406 let root = Uuid::new_v4();
407 let result = CommandBuilder::new(&client, "orders", root)
408 .with_sequence(0)
409 .build();
410
411 assert!(result.is_err());
412 let err = result.unwrap_err();
413 assert!(err.is_invalid_argument());
414 }
415
416 #[test]
417 fn test_command_builder_build_missing_payload() {
418 let client = MockGatewayClient {
419 response: CommandResponse::default(),
420 };
421 let root = Uuid::new_v4();
422 let mut builder = CommandBuilder::new(&client, "orders", root);
423 builder.type_url = Some("type.googleapis.com/test".to_string());
424 builder.sequence = Some(0);
425 let result = builder.build();
426
427 assert!(result.is_err());
428 }
429
430 #[test]
431 fn test_command_builder_build_missing_sequence() {
432 let client = MockGatewayClient {
433 response: CommandResponse::default(),
434 };
435 let root = Uuid::new_v4();
436 let msg = prost_types::Duration {
437 seconds: 42,
438 nanos: 0,
439 };
440 let result = CommandBuilder::new(&client, "orders", root)
441 .with_command("type.googleapis.com/test.Command", &msg)
442 .build();
443
444 assert!(result.is_err());
445 let err = result.unwrap_err();
446 assert!(err.is_invalid_argument());
447 }
448
449 #[test]
451 fn test_query_builder_by_correlation_id() {
452 let client = MockQueryClient {
453 event_book: EventBook::default(),
454 };
455 let root = Uuid::new_v4();
456 let builder =
457 QueryBuilder::new(&client, "orders", Some(root)).by_correlation_id("corr-123");
458
459 assert_eq!(builder.correlation_id, Some("corr-123".to_string()));
460 assert!(builder.root.is_none()); }
462
463 #[test]
464 fn test_query_builder_edition() {
465 let client = MockQueryClient {
466 event_book: EventBook::default(),
467 };
468 let builder = QueryBuilder::new(&client, "orders", None).edition("test-edition");
469
470 assert_eq!(builder.edition, Some("test-edition".to_string()));
471 }
472
473 #[test]
474 fn test_query_builder_range() {
475 let client = MockQueryClient {
476 event_book: EventBook::default(),
477 };
478 let builder = QueryBuilder::new(&client, "orders", None).range(10);
479
480 match builder.selection {
481 Some(Selection::Range(r)) => {
482 assert_eq!(r.lower, 10);
483 assert!(r.upper.is_none());
484 }
485 _ => panic!("expected Range selection"),
486 }
487 }
488
489 #[test]
490 fn test_query_builder_range_to() {
491 let client = MockQueryClient {
492 event_book: EventBook::default(),
493 };
494 let builder = QueryBuilder::new(&client, "orders", None).range_to(5, 15);
495
496 match builder.selection {
497 Some(Selection::Range(r)) => {
498 assert_eq!(r.lower, 5);
499 assert_eq!(r.upper, Some(15));
500 }
501 _ => panic!("expected Range selection"),
502 }
503 }
504
505 #[test]
506 fn test_query_builder_as_of_sequence() {
507 let client = MockQueryClient {
508 event_book: EventBook::default(),
509 };
510 let builder = QueryBuilder::new(&client, "orders", None).as_of_sequence(42);
511
512 match builder.selection {
513 Some(Selection::Temporal(t)) => match t.point_in_time {
514 Some(PointInTime::AsOfSequence(s)) => assert_eq!(s, 42),
515 _ => panic!("expected AsOfSequence"),
516 },
517 _ => panic!("expected Temporal selection"),
518 }
519 }
520
521 #[test]
522 fn test_query_builder_as_of_time_valid() {
523 let client = MockQueryClient {
524 event_book: EventBook::default(),
525 };
526 let builder = QueryBuilder::new(&client, "orders", None)
527 .as_of_time("2024-01-15T10:30:00Z")
528 .unwrap();
529
530 match builder.selection {
531 Some(Selection::Temporal(t)) => match t.point_in_time {
532 Some(PointInTime::AsOfTime(ts)) => assert_eq!(ts.seconds, 1705314600),
533 _ => panic!("expected AsOfTime"),
534 },
535 _ => panic!("expected Temporal selection"),
536 }
537 }
538
539 #[test]
540 fn test_query_builder_as_of_time_invalid() {
541 let client = MockQueryClient {
542 event_book: EventBook::default(),
543 };
544 let result = QueryBuilder::new(&client, "orders", None).as_of_time("not a timestamp");
545
546 assert!(result.is_err());
547 }
548
549 #[test]
550 fn test_query_builder_build() {
551 let client = MockQueryClient {
552 event_book: EventBook::default(),
553 };
554 let root = Uuid::new_v4();
555 let query = QueryBuilder::new(&client, "orders", Some(root))
556 .edition("test-edition")
557 .range(10)
558 .build();
559
560 let cover = query.cover.unwrap();
561 assert_eq!(cover.domain, "orders");
562 assert!(cover.root.is_some());
563 assert!(cover.edition.is_some());
564 assert!(query.selection.is_some());
565 }
566
567 #[test]
568 fn test_query_builder_build_with_correlation_id() {
569 let client = MockQueryClient {
570 event_book: EventBook::default(),
571 };
572 let query = QueryBuilder::new(&client, "orders", None)
573 .by_correlation_id("corr-123")
574 .build();
575
576 let cover = query.cover.unwrap();
577 assert_eq!(cover.correlation_id, "corr-123");
578 assert!(cover.root.is_none());
579 }
580
581 #[test]
583 fn test_root_from_cover_some() {
584 let root = Uuid::new_v4();
585 let cover = make_cover("orders", "", Some(root));
586 assert_eq!(root_from_cover(&cover), Some(root));
587 }
588
589 #[test]
590 fn test_root_from_cover_none() {
591 let cover = make_cover("orders", "", None);
592 assert_eq!(root_from_cover(&cover), None);
593 }
594
595 #[test]
596 fn test_root_from_cover_invalid_uuid() {
597 let cover = Cover {
598 domain: "orders".to_string(),
599 correlation_id: String::new(),
600 root: Some(ProtoUuid {
601 value: vec![1, 2, 3], }),
603 edition: None,
604 };
605 assert_eq!(root_from_cover(&cover), None);
606 }
607
608 #[test]
609 fn test_events_from_response_with_events() {
610 let events = EventBook {
611 cover: None,
612 pages: vec![EventPage::default(), EventPage::default()],
613 snapshot: None,
614 next_sequence: 0,
615 };
616 let response = CommandResponse {
617 events: Some(events),
618 ..Default::default()
619 };
620
621 let pages = events_from_response(&response);
622 assert_eq!(pages.len(), 2);
623 }
624
625 #[test]
626 fn test_events_from_response_no_events() {
627 let response = CommandResponse {
628 events: None,
629 ..Default::default()
630 };
631
632 let pages = events_from_response(&response);
633 assert!(pages.is_empty());
634 }
635
636 #[test]
637 fn test_decode_event_success() {
638 use crate::proto::event_page::Payload;
639
640 let msg = prost_types::Duration {
642 seconds: 42,
643 nanos: 0,
644 };
645 let event = EventPage {
646 header: Some(PageHeader {
647 sequence_type: Some(SequenceType::Sequence(1)),
648 }),
649 created_at: None,
650 payload: Some(Payload::Event(prost_types::Any {
651 type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
652 value: msg.encode_to_vec(),
653 })),
654 };
655
656 let decoded: Option<prost_types::Duration> = decode_event(&event, "Duration");
657 assert!(decoded.is_some());
658 assert_eq!(decoded.unwrap().seconds, 42);
659 }
660
661 #[test]
662 fn test_decode_event_type_mismatch() {
663 use crate::proto::event_page::Payload;
664
665 let msg = prost_types::Duration {
666 seconds: 42,
667 nanos: 0,
668 };
669 let event = EventPage {
670 header: Some(PageHeader {
671 sequence_type: Some(SequenceType::Sequence(1)),
672 }),
673 created_at: None,
674 payload: Some(Payload::Event(prost_types::Any {
675 type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
676 value: msg.encode_to_vec(),
677 })),
678 };
679
680 let decoded: Option<prost_types::Duration> = decode_event(&event, "Timestamp");
681 assert!(decoded.is_none());
682 }
683
684 #[test]
685 fn test_decode_event_nil_event() {
686 let event = EventPage {
687 header: Some(PageHeader {
688 sequence_type: Some(SequenceType::Sequence(1)),
689 }),
690 created_at: None,
691 payload: None,
692 };
693
694 let decoded: Option<prost_types::Duration> = decode_event(&event, "Duration");
695 assert!(decoded.is_none());
696 }
697
698 #[test]
699 fn test_decode_event_invalid_payload() {
700 use crate::proto::event_page::Payload;
701
702 let event = EventPage {
703 header: Some(PageHeader {
704 sequence_type: Some(SequenceType::Sequence(1)),
705 }),
706 created_at: None,
707 payload: Some(Payload::Event(prost_types::Any {
708 type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
709 value: vec![0xFF, 0xFF, 0xFF], })),
711 };
712
713 let decoded: Option<prost_types::Duration> = decode_event(&event, "Duration");
714 assert!(decoded.is_none());
715 }
716
717 #[test]
719 fn test_command_builder_ext_command() {
720 let client = MockGatewayClient {
721 response: CommandResponse::default(),
722 };
723 let root = Uuid::new_v4();
724 let builder = client.command("orders", root);
725
726 assert_eq!(builder.domain, "orders");
727 assert_eq!(builder.root, root);
728 }
729
730 #[test]
731 fn test_command_builder_with_merge_strategy() {
732 let client = MockGatewayClient {
733 response: CommandResponse::default(),
734 };
735 let root = Uuid::new_v4();
736 let builder = CommandBuilder::new(&client, "orders", root)
737 .with_merge_strategy(crate::proto::MergeStrategy::MergeStrict);
738
739 assert_eq!(
740 builder.merge_strategy,
741 crate::proto::MergeStrategy::MergeStrict
742 );
743 }
744
745 #[test]
746 fn test_query_builder_ext_query() {
747 let client = MockQueryClient {
748 event_book: EventBook::default(),
749 };
750 let root = Uuid::new_v4();
751 let builder = client.query("orders", root);
752
753 assert_eq!(builder.domain, "orders");
754 assert_eq!(builder.root, Some(root));
755 }
756
757 #[test]
758 fn test_query_builder_ext_query_domain() {
759 let client = MockQueryClient {
760 event_book: EventBook::default(),
761 };
762 let builder = client.query_domain("orders");
763
764 assert_eq!(builder.domain, "orders");
765 assert!(builder.root.is_none());
766 }
767}