Skip to main content

angzarr_client/
builder.rs

1//! Fluent builders for commands and queries.
2
3use crate::convert::{parse_timestamp, uuid_to_proto};
4use crate::error::{ClientError, Result};
5use crate::proto::{
6    page_header::SequenceType, query::Selection, temporal_query::PointInTime, CommandBook,
7    CommandPage, CommandResponse, Cover, Edition, EventBook, EventPage, PageHeader, Query,
8    SequenceRange, TemporalQuery,
9};
10use crate::traits;
11use prost::Message;
12use uuid::Uuid;
13
14/// Builder for constructing and executing commands.
15pub struct CommandBuilder<'a, C: traits::GatewayClient> {
16    client: &'a C,
17    domain: String,
18    root: Uuid,
19    correlation_id: Option<String>,
20    sequence: Option<u32>,
21    merge_strategy: crate::proto::MergeStrategy,
22    type_url: Option<String>,
23    payload: Option<Vec<u8>>,
24}
25
26impl<'a, C: traits::GatewayClient> CommandBuilder<'a, C> {
27    pub(crate) fn new(client: &'a C, domain: impl Into<String>, root: Uuid) -> Self {
28        Self {
29            client,
30            domain: domain.into(),
31            root,
32            correlation_id: None,
33            sequence: None,
34            merge_strategy: crate::proto::MergeStrategy::MergeCommutative,
35            type_url: None,
36            payload: None,
37        }
38    }
39
40    /// Set the correlation ID for request tracing.
41    /// If not set, a random UUID will be generated.
42    pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
43        self.correlation_id = Some(id.into());
44        self
45    }
46
47    /// Set the expected sequence number for optimistic locking.
48    /// This is required - the builder will fail without it.
49    pub fn with_sequence(mut self, seq: u32) -> Self {
50        self.sequence = Some(seq);
51        self
52    }
53
54    /// Set the merge strategy for conflict resolution.
55    /// Defaults to `MergeCommutative`.
56    pub fn with_merge_strategy(mut self, strategy: crate::proto::MergeStrategy) -> Self {
57        self.merge_strategy = strategy;
58        self
59    }
60
61    /// Set the command type URL and message.
62    pub fn with_command<M: Message>(mut self, type_url: impl Into<String>, message: &M) -> Self {
63        self.type_url = Some(type_url.into());
64        self.payload = Some(message.encode_to_vec());
65        self
66    }
67
68    /// Build the CommandBook without executing.
69    pub fn build(self) -> Result<CommandBook> {
70        let type_url = self.type_url.ok_or_else(|| ClientError::InvalidArgument {
71            msg: "command type_url not set".to_string(),
72        })?;
73        let payload = self.payload.ok_or_else(|| ClientError::InvalidArgument {
74            msg: "command payload not set".to_string(),
75        })?;
76        let sequence = self.sequence.ok_or_else(|| ClientError::InvalidArgument {
77            msg: "command sequence not set".to_string(),
78        })?;
79        let correlation_id = self
80            .correlation_id
81            .unwrap_or_else(|| Uuid::new_v4().to_string());
82
83        Ok(CommandBook {
84            cover: Some(Cover {
85                domain: self.domain,
86                root: Some(uuid_to_proto(self.root)),
87                correlation_id,
88                edition: None,
89            }),
90            pages: vec![CommandPage {
91                header: Some(PageHeader {
92                    sequence_type: Some(SequenceType::Sequence(sequence)),
93                }),
94                merge_strategy: self.merge_strategy as i32,
95                payload: Some(crate::proto::command_page::Payload::Command(
96                    prost_types::Any {
97                        type_url,
98                        value: payload,
99                    },
100                )),
101            }],
102        })
103    }
104
105    /// Execute the command.
106    pub async fn execute(self) -> Result<CommandResponse> {
107        let client = self.client;
108        let command = self.build()?;
109        client.execute(command).await
110    }
111}
112
113/// Builder for constructing and executing queries.
114pub struct QueryBuilder<'a, C: traits::QueryClient> {
115    client: &'a C,
116    domain: String,
117    root: Option<Uuid>,
118    correlation_id: Option<String>,
119    selection: Option<Selection>,
120    edition: Option<String>,
121}
122
123impl<'a, C: traits::QueryClient> QueryBuilder<'a, C> {
124    pub(crate) fn new(client: &'a C, domain: impl Into<String>, root: Option<Uuid>) -> Self {
125        Self {
126            client,
127            domain: domain.into(),
128            root,
129            correlation_id: None,
130            selection: None,
131            edition: None,
132        }
133    }
134
135    /// Query by correlation ID instead of root.
136    pub fn by_correlation_id(mut self, id: impl Into<String>) -> Self {
137        self.correlation_id = Some(id.into());
138        self.root = None;
139        self
140    }
141
142    /// Query events from a specific edition (diverged timeline).
143    pub fn edition(mut self, edition: impl Into<String>) -> Self {
144        self.edition = Some(edition.into());
145        self
146    }
147
148    /// Query a range of sequences (inclusive lower bound).
149    pub fn range(mut self, lower: u32) -> Self {
150        self.selection = Some(Selection::Range(SequenceRange { lower, upper: None }));
151        self
152    }
153
154    /// Query a range of sequences with upper bound (inclusive).
155    pub fn range_to(mut self, lower: u32, upper: u32) -> Self {
156        self.selection = Some(Selection::Range(SequenceRange {
157            lower,
158            upper: Some(upper),
159        }));
160        self
161    }
162
163    /// Query state as of a specific sequence number.
164    pub fn as_of_sequence(mut self, seq: u32) -> Self {
165        self.selection = Some(Selection::Temporal(TemporalQuery {
166            point_in_time: Some(PointInTime::AsOfSequence(seq)),
167        }));
168        self
169    }
170
171    /// Query state as of a specific timestamp (RFC3339 format).
172    pub fn as_of_time(mut self, rfc3339: &str) -> Result<Self> {
173        let timestamp = parse_timestamp(rfc3339)?;
174        self.selection = Some(Selection::Temporal(TemporalQuery {
175            point_in_time: Some(PointInTime::AsOfTime(timestamp)),
176        }));
177        Ok(self)
178    }
179
180    /// Build the Query without executing.
181    pub fn build(self) -> Query {
182        self.build_inner()
183    }
184
185    fn build_inner(&self) -> Query {
186        Query {
187            cover: Some(Cover {
188                domain: self.domain.clone(),
189                root: self.root.map(uuid_to_proto),
190                correlation_id: self.correlation_id.clone().unwrap_or_default(),
191                edition: self.edition.clone().map(Edition::from),
192            }),
193            selection: self.selection.clone(),
194        }
195    }
196
197    /// Execute the query and return the EventBook.
198    pub async fn get_events(self) -> Result<EventBook> {
199        let client = self.client;
200        let query = self.build_inner();
201        client.get_events(query).await
202    }
203
204    /// Execute the query and return just the event pages.
205    pub async fn get_pages(self) -> Result<Vec<EventPage>> {
206        let client = self.client;
207        let query = self.build_inner();
208        let event_book = client.get_events(query).await?;
209        Ok(event_book.pages)
210    }
211}
212
213/// Extension trait for creating command builders.
214pub trait CommandBuilderExt: traits::GatewayClient + Sized {
215    /// Start building a command for the given domain and root.
216    fn command(&self, domain: impl Into<String>, root: Uuid) -> CommandBuilder<'_, Self> {
217        CommandBuilder::new(self, domain, root)
218    }
219}
220
221impl<T: traits::GatewayClient> CommandBuilderExt for T {}
222
223/// Extension trait for creating query builders.
224pub trait QueryBuilderExt: traits::QueryClient + Sized {
225    /// Start building a query for the given domain and root.
226    fn query(&self, domain: impl Into<String>, root: Uuid) -> QueryBuilder<'_, Self> {
227        QueryBuilder::new(self, domain, Some(root))
228    }
229
230    /// Start building a query by domain only (use with by_correlation_id).
231    fn query_domain(&self, domain: impl Into<String>) -> QueryBuilder<'_, Self> {
232        QueryBuilder::new(self, domain, None)
233    }
234}
235
236impl<T: traits::QueryClient> QueryBuilderExt for T {}
237
238/// Helper to extract the root UUID from a Cover.
239pub fn root_from_cover(cover: &Cover) -> Option<Uuid> {
240    cover
241        .root
242        .as_ref()
243        .and_then(|r| Uuid::from_slice(&r.value).ok())
244}
245
246/// Helper to extract events from a CommandResponse.
247pub fn events_from_response(response: &CommandResponse) -> &[EventPage] {
248    response
249        .events
250        .as_ref()
251        .map(|e| e.pages.as_slice())
252        .unwrap_or(&[])
253}
254
255/// Helper to decode an event payload if the type URL matches.
256pub fn decode_event<M: Message + Default>(event: &EventPage, type_suffix: &str) -> Option<M> {
257    let any = match &event.payload {
258        Some(crate::proto::event_page::Payload::Event(e)) => e,
259        _ => return None,
260    };
261    if !any.type_url.ends_with(type_suffix) {
262        return None;
263    }
264    M::decode(any.value.as_slice()).ok()
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use crate::proto::{Cover, Uuid as ProtoUuid};
271    use async_trait::async_trait;
272
273    // Mock client for testing QueryBuilder
274    struct MockQueryClient {
275        event_book: EventBook,
276    }
277
278    #[async_trait]
279    impl traits::QueryClient for MockQueryClient {
280        async fn get_events(&self, _query: Query) -> Result<EventBook> {
281            Ok(self.event_book.clone())
282        }
283    }
284
285    // Mock client for testing CommandBuilder
286    struct MockGatewayClient {
287        response: CommandResponse,
288    }
289
290    #[async_trait]
291    impl traits::GatewayClient for MockGatewayClient {
292        async fn execute(&self, _command: CommandBook) -> Result<CommandResponse> {
293            Ok(self.response.clone())
294        }
295    }
296
297    fn make_cover(domain: &str, correlation_id: &str, root: Option<Uuid>) -> Cover {
298        Cover {
299            domain: domain.to_string(),
300            correlation_id: correlation_id.to_string(),
301            root: root.map(|u| ProtoUuid {
302                value: u.as_bytes().to_vec(),
303            }),
304            edition: None,
305        }
306    }
307
308    // CommandBuilder tests
309    #[test]
310    fn test_command_builder_with_correlation_id() {
311        let client = MockGatewayClient {
312            response: CommandResponse::default(),
313        };
314        let root = Uuid::new_v4();
315        let builder = CommandBuilder::new(&client, "orders", root).with_correlation_id("corr-123");
316
317        assert_eq!(builder.correlation_id, Some("corr-123".to_string()));
318    }
319
320    #[test]
321    fn test_command_builder_with_sequence() {
322        let client = MockGatewayClient {
323            response: CommandResponse::default(),
324        };
325        let root = Uuid::new_v4();
326        let builder = CommandBuilder::new(&client, "orders", root).with_sequence(42);
327
328        assert_eq!(builder.sequence, Some(42));
329    }
330
331    #[test]
332    fn test_command_builder_with_command() {
333        let client = MockGatewayClient {
334            response: CommandResponse::default(),
335        };
336        let root = Uuid::new_v4();
337        let msg = prost_types::Duration {
338            seconds: 42,
339            nanos: 0,
340        };
341        let builder = CommandBuilder::new(&client, "orders", root)
342            .with_command("type.googleapis.com/test.Command", &msg);
343
344        assert_eq!(
345            builder.type_url,
346            Some("type.googleapis.com/test.Command".to_string())
347        );
348        assert!(builder.payload.is_some());
349    }
350
351    #[test]
352    fn test_command_builder_build_success() {
353        let client = MockGatewayClient {
354            response: CommandResponse::default(),
355        };
356        let root = Uuid::new_v4();
357        let msg = prost_types::Duration {
358            seconds: 42,
359            nanos: 0,
360        };
361        let cmd = CommandBuilder::new(&client, "orders", root)
362            .with_correlation_id("corr-123")
363            .with_sequence(5)
364            .with_command("type.googleapis.com/test.Command", &msg)
365            .build()
366            .unwrap();
367
368        let cover = cmd.cover.unwrap();
369        assert_eq!(cover.domain, "orders");
370        assert_eq!(cover.correlation_id, "corr-123");
371        assert!(cover.root.is_some());
372        assert_eq!(cmd.pages.len(), 1);
373        // Check sequence via header
374        let header = cmd.pages[0].header.as_ref().unwrap();
375        match &header.sequence_type {
376            Some(SequenceType::Sequence(seq)) => assert_eq!(*seq, 5),
377            _ => panic!("expected explicit sequence"),
378        }
379    }
380
381    #[test]
382    fn test_command_builder_build_generates_correlation_id() {
383        let client = MockGatewayClient {
384            response: CommandResponse::default(),
385        };
386        let root = Uuid::new_v4();
387        let msg = prost_types::Duration {
388            seconds: 42,
389            nanos: 0,
390        };
391        let cmd = CommandBuilder::new(&client, "orders", root)
392            .with_sequence(0)
393            .with_command("type.googleapis.com/test.Command", &msg)
394            .build()
395            .unwrap();
396
397        let cover = cmd.cover.unwrap();
398        assert!(!cover.correlation_id.is_empty());
399    }
400
401    #[test]
402    fn test_command_builder_build_missing_type_url() {
403        let client = MockGatewayClient {
404            response: CommandResponse::default(),
405        };
406        let root = Uuid::new_v4();
407        let result = CommandBuilder::new(&client, "orders", root)
408            .with_sequence(0)
409            .build();
410
411        assert!(result.is_err());
412        let err = result.unwrap_err();
413        assert!(err.is_invalid_argument());
414    }
415
416    #[test]
417    fn test_command_builder_build_missing_payload() {
418        let client = MockGatewayClient {
419            response: CommandResponse::default(),
420        };
421        let root = Uuid::new_v4();
422        let mut builder = CommandBuilder::new(&client, "orders", root);
423        builder.type_url = Some("type.googleapis.com/test".to_string());
424        builder.sequence = Some(0);
425        let result = builder.build();
426
427        assert!(result.is_err());
428    }
429
430    #[test]
431    fn test_command_builder_build_missing_sequence() {
432        let client = MockGatewayClient {
433            response: CommandResponse::default(),
434        };
435        let root = Uuid::new_v4();
436        let msg = prost_types::Duration {
437            seconds: 42,
438            nanos: 0,
439        };
440        let result = CommandBuilder::new(&client, "orders", root)
441            .with_command("type.googleapis.com/test.Command", &msg)
442            .build();
443
444        assert!(result.is_err());
445        let err = result.unwrap_err();
446        assert!(err.is_invalid_argument());
447    }
448
449    // QueryBuilder tests
450    #[test]
451    fn test_query_builder_by_correlation_id() {
452        let client = MockQueryClient {
453            event_book: EventBook::default(),
454        };
455        let root = Uuid::new_v4();
456        let builder =
457            QueryBuilder::new(&client, "orders", Some(root)).by_correlation_id("corr-123");
458
459        assert_eq!(builder.correlation_id, Some("corr-123".to_string()));
460        assert!(builder.root.is_none()); // root should be cleared
461    }
462
463    #[test]
464    fn test_query_builder_edition() {
465        let client = MockQueryClient {
466            event_book: EventBook::default(),
467        };
468        let builder = QueryBuilder::new(&client, "orders", None).edition("test-edition");
469
470        assert_eq!(builder.edition, Some("test-edition".to_string()));
471    }
472
473    #[test]
474    fn test_query_builder_range() {
475        let client = MockQueryClient {
476            event_book: EventBook::default(),
477        };
478        let builder = QueryBuilder::new(&client, "orders", None).range(10);
479
480        match builder.selection {
481            Some(Selection::Range(r)) => {
482                assert_eq!(r.lower, 10);
483                assert!(r.upper.is_none());
484            }
485            _ => panic!("expected Range selection"),
486        }
487    }
488
489    #[test]
490    fn test_query_builder_range_to() {
491        let client = MockQueryClient {
492            event_book: EventBook::default(),
493        };
494        let builder = QueryBuilder::new(&client, "orders", None).range_to(5, 15);
495
496        match builder.selection {
497            Some(Selection::Range(r)) => {
498                assert_eq!(r.lower, 5);
499                assert_eq!(r.upper, Some(15));
500            }
501            _ => panic!("expected Range selection"),
502        }
503    }
504
505    #[test]
506    fn test_query_builder_as_of_sequence() {
507        let client = MockQueryClient {
508            event_book: EventBook::default(),
509        };
510        let builder = QueryBuilder::new(&client, "orders", None).as_of_sequence(42);
511
512        match builder.selection {
513            Some(Selection::Temporal(t)) => match t.point_in_time {
514                Some(PointInTime::AsOfSequence(s)) => assert_eq!(s, 42),
515                _ => panic!("expected AsOfSequence"),
516            },
517            _ => panic!("expected Temporal selection"),
518        }
519    }
520
521    #[test]
522    fn test_query_builder_as_of_time_valid() {
523        let client = MockQueryClient {
524            event_book: EventBook::default(),
525        };
526        let builder = QueryBuilder::new(&client, "orders", None)
527            .as_of_time("2024-01-15T10:30:00Z")
528            .unwrap();
529
530        match builder.selection {
531            Some(Selection::Temporal(t)) => match t.point_in_time {
532                Some(PointInTime::AsOfTime(ts)) => assert_eq!(ts.seconds, 1705314600),
533                _ => panic!("expected AsOfTime"),
534            },
535            _ => panic!("expected Temporal selection"),
536        }
537    }
538
539    #[test]
540    fn test_query_builder_as_of_time_invalid() {
541        let client = MockQueryClient {
542            event_book: EventBook::default(),
543        };
544        let result = QueryBuilder::new(&client, "orders", None).as_of_time("not a timestamp");
545
546        assert!(result.is_err());
547    }
548
549    #[test]
550    fn test_query_builder_build() {
551        let client = MockQueryClient {
552            event_book: EventBook::default(),
553        };
554        let root = Uuid::new_v4();
555        let query = QueryBuilder::new(&client, "orders", Some(root))
556            .edition("test-edition")
557            .range(10)
558            .build();
559
560        let cover = query.cover.unwrap();
561        assert_eq!(cover.domain, "orders");
562        assert!(cover.root.is_some());
563        assert!(cover.edition.is_some());
564        assert!(query.selection.is_some());
565    }
566
567    #[test]
568    fn test_query_builder_build_with_correlation_id() {
569        let client = MockQueryClient {
570            event_book: EventBook::default(),
571        };
572        let query = QueryBuilder::new(&client, "orders", None)
573            .by_correlation_id("corr-123")
574            .build();
575
576        let cover = query.cover.unwrap();
577        assert_eq!(cover.correlation_id, "corr-123");
578        assert!(cover.root.is_none());
579    }
580
581    // Helper function tests
582    #[test]
583    fn test_root_from_cover_some() {
584        let root = Uuid::new_v4();
585        let cover = make_cover("orders", "", Some(root));
586        assert_eq!(root_from_cover(&cover), Some(root));
587    }
588
589    #[test]
590    fn test_root_from_cover_none() {
591        let cover = make_cover("orders", "", None);
592        assert_eq!(root_from_cover(&cover), None);
593    }
594
595    #[test]
596    fn test_root_from_cover_invalid_uuid() {
597        let cover = Cover {
598            domain: "orders".to_string(),
599            correlation_id: String::new(),
600            root: Some(ProtoUuid {
601                value: vec![1, 2, 3], // invalid - not 16 bytes
602            }),
603            edition: None,
604        };
605        assert_eq!(root_from_cover(&cover), None);
606    }
607
608    #[test]
609    fn test_events_from_response_with_events() {
610        let events = EventBook {
611            cover: None,
612            pages: vec![EventPage::default(), EventPage::default()],
613            snapshot: None,
614            next_sequence: 0,
615        };
616        let response = CommandResponse {
617            events: Some(events),
618            ..Default::default()
619        };
620
621        let pages = events_from_response(&response);
622        assert_eq!(pages.len(), 2);
623    }
624
625    #[test]
626    fn test_events_from_response_no_events() {
627        let response = CommandResponse {
628            events: None,
629            ..Default::default()
630        };
631
632        let pages = events_from_response(&response);
633        assert!(pages.is_empty());
634    }
635
636    #[test]
637    fn test_decode_event_success() {
638        use crate::proto::event_page::Payload;
639
640        // Use prost_types::Duration which implements Message + Default
641        let msg = prost_types::Duration {
642            seconds: 42,
643            nanos: 0,
644        };
645        let event = EventPage {
646            header: Some(PageHeader {
647                sequence_type: Some(SequenceType::Sequence(1)),
648            }),
649            created_at: None,
650            payload: Some(Payload::Event(prost_types::Any {
651                type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
652                value: msg.encode_to_vec(),
653            })),
654        };
655
656        let decoded: Option<prost_types::Duration> = decode_event(&event, "Duration");
657        assert!(decoded.is_some());
658        assert_eq!(decoded.unwrap().seconds, 42);
659    }
660
661    #[test]
662    fn test_decode_event_type_mismatch() {
663        use crate::proto::event_page::Payload;
664
665        let msg = prost_types::Duration {
666            seconds: 42,
667            nanos: 0,
668        };
669        let event = EventPage {
670            header: Some(PageHeader {
671                sequence_type: Some(SequenceType::Sequence(1)),
672            }),
673            created_at: None,
674            payload: Some(Payload::Event(prost_types::Any {
675                type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
676                value: msg.encode_to_vec(),
677            })),
678        };
679
680        let decoded: Option<prost_types::Duration> = decode_event(&event, "Timestamp");
681        assert!(decoded.is_none());
682    }
683
684    #[test]
685    fn test_decode_event_nil_event() {
686        let event = EventPage {
687            header: Some(PageHeader {
688                sequence_type: Some(SequenceType::Sequence(1)),
689            }),
690            created_at: None,
691            payload: None,
692        };
693
694        let decoded: Option<prost_types::Duration> = decode_event(&event, "Duration");
695        assert!(decoded.is_none());
696    }
697
698    #[test]
699    fn test_decode_event_invalid_payload() {
700        use crate::proto::event_page::Payload;
701
702        let event = EventPage {
703            header: Some(PageHeader {
704                sequence_type: Some(SequenceType::Sequence(1)),
705            }),
706            created_at: None,
707            payload: Some(Payload::Event(prost_types::Any {
708                type_url: "type.googleapis.com/google.protobuf.Duration".to_string(),
709                value: vec![0xFF, 0xFF, 0xFF], // garbage
710            })),
711        };
712
713        let decoded: Option<prost_types::Duration> = decode_event(&event, "Duration");
714        assert!(decoded.is_none());
715    }
716
717    // Extension trait tests
718    #[test]
719    fn test_command_builder_ext_command() {
720        let client = MockGatewayClient {
721            response: CommandResponse::default(),
722        };
723        let root = Uuid::new_v4();
724        let builder = client.command("orders", root);
725
726        assert_eq!(builder.domain, "orders");
727        assert_eq!(builder.root, root);
728    }
729
730    #[test]
731    fn test_command_builder_with_merge_strategy() {
732        let client = MockGatewayClient {
733            response: CommandResponse::default(),
734        };
735        let root = Uuid::new_v4();
736        let builder = CommandBuilder::new(&client, "orders", root)
737            .with_merge_strategy(crate::proto::MergeStrategy::MergeStrict);
738
739        assert_eq!(
740            builder.merge_strategy,
741            crate::proto::MergeStrategy::MergeStrict
742        );
743    }
744
745    #[test]
746    fn test_query_builder_ext_query() {
747        let client = MockQueryClient {
748            event_book: EventBook::default(),
749        };
750        let root = Uuid::new_v4();
751        let builder = client.query("orders", root);
752
753        assert_eq!(builder.domain, "orders");
754        assert_eq!(builder.root, Some(root));
755    }
756
757    #[test]
758    fn test_query_builder_ext_query_domain() {
759        let client = MockQueryClient {
760            event_book: EventBook::default(),
761        };
762        let builder = client.query_domain("orders");
763
764        assert_eq!(builder.domain, "orders");
765        assert!(builder.root.is_none());
766    }
767}