Skip to main content

umadb_dcb/
lib.rs

1//! API for Dynamic Consistency Boundaries (DCB) event store
2//!
3//! This module provides the core interfaces and data structures for working with
4//! an event store that supports dynamic consistency boundaries.
5
6use async_trait::async_trait;
7use futures_core::Stream;
8use futures_util::StreamExt;
9use std::iter::Iterator;
10use thiserror::Error;
11use uuid::Uuid;
12
13/// Non-async Rust interface for recording and retrieving events
14pub trait DcbEventStoreSync {
15    /// Reads events from the store based on the provided query and constraints
16    ///
17    /// Returns a `DcbReadResponseSync` that provides an iterator over all events,
18    /// unless 'from' is given then only those with position greater than 'after',
19    /// and unless any query items are given, then only those that match at least one
20    /// query item. An event matches a query item if its type is in the item types or
21    /// there are no item types, and if all the item tags are in the event tags.
22    fn read(
23        &self,
24        query: Option<DcbQuery>,
25        start: Option<u64>,
26        backwards: bool,
27        limit: Option<u32>,
28        subscribe: bool, // Deprecated - remove in v1.0.
29    ) -> DcbResult<Box<dyn DcbReadResponseSync + Send + 'static>>;
30
31    /// Reads events from the store and returns them as a tuple of `(Vec<DcbSequencedEvent>, Option<u64>)`
32    fn read_with_head(
33        &self,
34        query: Option<DcbQuery>,
35        start: Option<u64>,
36        backwards: bool,
37        limit: Option<u32>,
38    ) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
39        let mut response = self.read(query, start, backwards, limit, false)?;
40        response.collect_with_head()
41    }
42
43    /// Returns the current head position of the event store, or None if empty
44    ///
45    /// Returns the value of `last_committed_position`, or `None` if `last_committed_position` is zero
46    fn head(&self) -> DcbResult<Option<u64>>;
47
48    /// Returns the greatest recorded upstream position for a tracking source, if any
49    fn get_tracking_info(&self, source: &str) -> DcbResult<Option<u64>>;
50
51    /// Appends given events to the event store, unless the condition fails
52    ///
53    /// Returns the position of the last appended event
54    fn append(
55        &self,
56        events: Vec<DcbEvent>,
57        condition: Option<DcbAppendCondition>,
58        tracking_info: Option<TrackingInfo>,
59    ) -> DcbResult<u64>;
60}
61
62/// Response from a read operation, providing an iterator over sequenced events
63pub trait DcbReadResponseSync: Iterator<Item = DcbResult<DcbSequencedEvent>> + Send {
64    /// Returns the current head position of the event store, or None if empty
65    fn head(&mut self) -> DcbResult<Option<u64>>;
66    /// Returns a vector of events with head
67    fn collect_with_head(&mut self) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)>;
68    /// Returns the next batch of events for this read. Implementations may buffer
69    /// events per underlying transport message ("batch"). If there are no more
70    /// events available, returns an empty Vec. The head() method should reflect
71    /// the latest known head as reported by the underlying store.
72    fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>>;
73}
74
75/// Response from a subscribe operation, providing an iterator over sequenced events
76pub trait DcbSubscriptionSync: Iterator<Item = DcbResult<DcbSequencedEvent>> + Send {
77    /// Returns the next batch of events for this read. Implementations may buffer
78    /// events per underlying transport message ("batch"). If there are no more
79    /// events available, returns an empty Vec.
80    fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>>;
81}
82
83/// Async Rust interface for recording and retrieving events
84#[async_trait]
85pub trait DcbEventStoreAsync: Send + Sync {
86    /// Reads events from the store based on the provided query and constraints
87    ///
88    /// Returns a `DcbReadResponseSync` that provides an iterator over all events,
89    /// unless 'after' is given then only those with position greater than 'after',
90    /// and unless any query items are given, then only those that match at least one
91    /// query item. An event matches a query item if its type is in the item types or
92    /// there are no item types, and if all the item tags are in the event tags.
93    async fn read<'a>(
94        &'a self,
95        query: Option<DcbQuery>,
96        start: Option<u64>,
97        backwards: bool,
98        limit: Option<u32>,
99        subscribe: bool,
100    ) -> DcbResult<Box<dyn DcbReadResponseAsync + Send + 'static>>;
101
102    /// Reads events from the store and returns them as a tuple of `(Vec<DcbSequencedEvent>, Option<u64>)`
103    async fn read_with_head<'a>(
104        &'a self,
105        query: Option<DcbQuery>,
106        after: Option<u64>,
107        backwards: bool,
108        limit: Option<u32>,
109    ) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
110        let mut response = self.read(query, after, backwards, limit, false).await?;
111        response.collect_with_head().await
112    }
113
114    /// Returns the current head position of the event store, or None if empty
115    ///
116    /// Returns the value of last_committed_position, or None if last_committed_position is zero
117    async fn head(&self) -> DcbResult<Option<u64>>;
118
119    /// Returns the greatest recorded upstream position for a tracking source, if any
120    async fn get_tracking_info(&self, source: &str) -> DcbResult<Option<u64>>;
121
122    /// Appends given events to the event store, unless the condition fails
123    ///
124    /// Returns the position of the last appended event
125    async fn append(
126        &self,
127        events: Vec<DcbEvent>,
128        condition: Option<DcbAppendCondition>,
129        tracking_info: Option<TrackingInfo>,
130    ) -> DcbResult<u64>;
131}
132
133/// Asynchronous response from a read operation, providing a stream of sequenced events
134#[async_trait]
135pub trait DcbReadResponseAsync: Stream<Item = DcbResult<DcbSequencedEvent>> + Send + Unpin {
136    async fn head(&mut self) -> DcbResult<Option<u64>>;
137
138    async fn collect_with_head(&mut self) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
139        let mut events = Vec::new();
140        while let Some(result) = self.next().await {
141            events.push(result?); // propagate error from stream
142        }
143
144        let head = self.head().await?;
145        Ok((events, head))
146    }
147
148    async fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>>;
149}
150
151/// Asynchronous response from a subscribe operation, providing a stream of sequenced events
152#[async_trait]
153pub trait DcbSubscriptionAsync: Stream<Item = DcbResult<DcbSequencedEvent>> + Send + Unpin {
154    async fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>>;
155}
156
157/// Represents a query item for filtering events
158#[derive(Debug, Clone, Default)]
159#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
160pub struct DcbQueryItem {
161    /// Event types to match
162    pub types: Vec<String>,
163    /// Tags that must all be present in the event
164    pub tags: Vec<String>,
165}
166
167impl DcbQueryItem {
168    /// Creates a new query item
169    pub fn new() -> Self {
170        Self {
171            types: vec![],
172            tags: vec![],
173        }
174    }
175
176    /// Sets the types for this query item
177    pub fn types<I, S>(mut self, types: I) -> Self
178    where
179        I: IntoIterator<Item = S>,
180        S: Into<String>,
181    {
182        self.types = types.into_iter().map(|s| s.into()).collect();
183        self
184    }
185
186    /// Sets the tags for this query item
187    pub fn tags<I, S>(mut self, tags: I) -> Self
188    where
189        I: IntoIterator<Item = S>,
190        S: Into<String>,
191    {
192        self.tags = tags.into_iter().map(|s| s.into()).collect();
193        self
194    }
195}
196
197/// A query composed of multiple query items
198#[derive(Debug, Clone, Default)]
199#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
200pub struct DcbQuery {
201    /// List of query items, where events matching any item are included in results
202    pub items: Vec<DcbQueryItem>,
203}
204
205impl DcbQuery {
206    /// Creates a new empty query
207    pub fn new() -> Self {
208        Self { items: Vec::new() }
209    }
210
211    /// Creates a query with the specified items
212    pub fn with_items<I>(items: I) -> Self
213    where
214        I: IntoIterator<Item = DcbQueryItem>,
215    {
216        Self {
217            items: items.into_iter().collect(),
218        }
219    }
220
221    /// Adds a query item to this query
222    pub fn item(mut self, item: DcbQueryItem) -> Self {
223        self.items.push(item);
224        self
225    }
226
227    /// Adds multiple query items to this query
228    pub fn items<I>(mut self, items: I) -> Self
229    where
230        I: IntoIterator<Item = DcbQueryItem>,
231    {
232        self.items.extend(items);
233        self
234    }
235}
236
237/// Conditions that must be satisfied for an append operation to succeed
238#[derive(Debug, Clone, Default)]
239#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
240pub struct DcbAppendCondition {
241    /// Query that, if matching any events, will cause the append to fail
242    pub fail_if_events_match: DcbQuery,
243    /// Position after which to append; if None, append at the end
244    pub after: Option<u64>,
245}
246
247impl DcbAppendCondition {
248    /// Creates a new empty append condition
249    pub fn new(fail_if_events_match: DcbQuery) -> Self {
250        Self {
251            fail_if_events_match,
252            after: None,
253        }
254    }
255
256    pub fn after(mut self, after: Option<u64>) -> Self {
257        self.after = after;
258        self
259    }
260}
261
262/// Represents an event in the event store
263#[derive(Debug, Clone)]
264#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
265pub struct DcbEvent {
266    /// Type of the event
267    pub event_type: String,
268    /// Tags associated with the event
269    pub tags: Vec<String>,
270    /// Binary data associated with the event
271    pub data: Vec<u8>,
272    /// Unique event ID
273    pub uuid: Option<Uuid>,
274}
275
276impl Default for DcbEvent {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282impl DcbEvent {
283    /// Creates a new event
284    pub fn new() -> Self {
285        Self {
286            event_type: "".to_string(),
287            data: Vec::new(),
288            tags: Vec::new(),
289            uuid: None,
290        }
291    }
292
293    /// Sets the type for this event
294    pub fn event_type<S: Into<String>>(mut self, event_type: S) -> Self {
295        self.event_type = event_type.into();
296        self
297    }
298
299    /// Sets the data for this event
300    pub fn data<D: Into<Vec<u8>>>(mut self, data: D) -> Self {
301        self.data = data.into();
302        self
303    }
304
305    /// Sets the tags for this event
306    pub fn tags<I, S>(mut self, tags: I) -> Self
307    where
308        I: IntoIterator<Item = S>,
309        S: Into<String>,
310    {
311        self.tags = tags.into_iter().map(|s| s.into()).collect();
312        self
313    }
314
315    /// Sets the UUID for this event
316    pub fn uuid(mut self, uuid: Uuid) -> Self {
317        self.uuid = Some(uuid);
318        self
319    }
320}
321
322#[derive(Debug, Clone)]
323#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
324pub struct TrackingInfo {
325    pub source: String,
326    pub position: u64,
327}
328
329/// An event with its position in the event sequence
330#[derive(Debug, Clone)]
331#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
332pub struct DcbSequencedEvent {
333    /// Position of the event in the sequence
334    pub position: u64,
335    /// The event
336    pub event: DcbEvent,
337}
338
339// Error types
340#[derive(Error, Debug)]
341#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
342pub enum DcbError {
343    // Generic/system errors
344    #[error("io error: {0}")]
345    #[cfg_attr(feature = "serde", serde(with = "serde_io_error"))]
346    Io(#[from] std::io::Error),
347
348    // DCB domain errors
349    #[error("integrity error: condition failed: {0}")]
350    IntegrityError(String),
351    #[error("corruption detected: {0}")]
352    Corruption(String),
353    /// Invalid input argument provided by the caller
354    #[error("invalid argument: {0}")]
355    InvalidArgument(String),
356
357    // Storage errors (unified into DCBError)
358    #[error("initialization error: {0}")]
359    InitializationError(String),
360    #[error("page not found: {0}")]
361    PageNotFound(u64),
362    #[error("dirty page not found: {0}")]
363    DirtyPageNotFound(u64),
364    #[error("root ID mismatched: old {0} new {1}")]
365    RootIDMismatch(u64, u64),
366    #[error("database corrupted: {0}")]
367    DatabaseCorrupted(String),
368    #[error("internal error: {0}")]
369    InternalError(String),
370    #[error("serialization error: {0}")]
371    SerializationError(String),
372    #[error("deserialization error: {0}")]
373    DeserializationError(String),
374    #[error("page already freed: {0}")]
375    PageAlreadyFreed(u64),
376    #[error("page already dirty: {0}")]
377    PageAlreadyDirty(u64),
378    #[error("transport error: {0}")]
379    TransportError(String),
380    #[error("cancelled by user")]
381    CancelledByUser(),
382
383    // Authentication error
384    #[error("authentication error: {0}")]
385    AuthenticationError(String),
386}
387
388pub type DcbResult<T> = Result<T, DcbError>;
389
390#[cfg(feature = "serde")]
391mod serde_io_error {
392    use std::{borrow::Cow, io};
393
394    use serde::{Deserialize, Serialize};
395
396    #[derive(Serialize, Deserialize)]
397    struct IoError {
398        kind: Option<Cow<'static, str>>,
399        message: Option<String>,
400    }
401
402    pub fn serialize<S>(err: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
403    where
404        S: serde::Serializer,
405    {
406        let kind = match err.kind() {
407            io::ErrorKind::NotFound => Some(Cow::Borrowed("NotFound")),
408            io::ErrorKind::PermissionDenied => Some(Cow::Borrowed("PermissionDenied")),
409            io::ErrorKind::ConnectionRefused => Some(Cow::Borrowed("ConnectionRefused")),
410            io::ErrorKind::ConnectionReset => Some(Cow::Borrowed("ConnectionReset")),
411            io::ErrorKind::HostUnreachable => Some(Cow::Borrowed("HostUnreachable")),
412            io::ErrorKind::NetworkUnreachable => Some(Cow::Borrowed("NetworkUnreachable")),
413            io::ErrorKind::ConnectionAborted => Some(Cow::Borrowed("ConnectionAborted")),
414            io::ErrorKind::NotConnected => Some(Cow::Borrowed("NotConnected")),
415            io::ErrorKind::AddrInUse => Some(Cow::Borrowed("AddrInUse")),
416            io::ErrorKind::AddrNotAvailable => Some(Cow::Borrowed("AddrNotAvailable")),
417            io::ErrorKind::NetworkDown => Some(Cow::Borrowed("NetworkDown")),
418            io::ErrorKind::BrokenPipe => Some(Cow::Borrowed("BrokenPipe")),
419            io::ErrorKind::AlreadyExists => Some(Cow::Borrowed("AlreadyExists")),
420            io::ErrorKind::WouldBlock => Some(Cow::Borrowed("WouldBlock")),
421            io::ErrorKind::NotADirectory => Some(Cow::Borrowed("NotADirectory")),
422            io::ErrorKind::IsADirectory => Some(Cow::Borrowed("IsADirectory")),
423            io::ErrorKind::DirectoryNotEmpty => Some(Cow::Borrowed("DirectoryNotEmpty")),
424            io::ErrorKind::ReadOnlyFilesystem => Some(Cow::Borrowed("ReadOnlyFilesystem")),
425            io::ErrorKind::StaleNetworkFileHandle => Some(Cow::Borrowed("StaleNetworkFileHandle")),
426            io::ErrorKind::InvalidInput => Some(Cow::Borrowed("InvalidInput")),
427            io::ErrorKind::InvalidData => Some(Cow::Borrowed("InvalidData")),
428            io::ErrorKind::TimedOut => Some(Cow::Borrowed("TimedOut")),
429            io::ErrorKind::WriteZero => Some(Cow::Borrowed("WriteZero")),
430            io::ErrorKind::StorageFull => Some(Cow::Borrowed("StorageFull")),
431            io::ErrorKind::NotSeekable => Some(Cow::Borrowed("NotSeekable")),
432            io::ErrorKind::QuotaExceeded => Some(Cow::Borrowed("QuotaExceeded")),
433            io::ErrorKind::FileTooLarge => Some(Cow::Borrowed("FileTooLarge")),
434            io::ErrorKind::ResourceBusy => Some(Cow::Borrowed("ResourceBusy")),
435            io::ErrorKind::ExecutableFileBusy => Some(Cow::Borrowed("ExecutableFileBusy")),
436            io::ErrorKind::Deadlock => Some(Cow::Borrowed("Deadlock")),
437            io::ErrorKind::CrossesDevices => Some(Cow::Borrowed("CrossesDevices")),
438            io::ErrorKind::TooManyLinks => Some(Cow::Borrowed("TooManyLinks")),
439            io::ErrorKind::InvalidFilename => Some(Cow::Borrowed("InvalidFilename")),
440            io::ErrorKind::ArgumentListTooLong => Some(Cow::Borrowed("ArgumentListTooLong")),
441            io::ErrorKind::Interrupted => Some(Cow::Borrowed("Interrupted")),
442            io::ErrorKind::Unsupported => Some(Cow::Borrowed("Unsupported")),
443            io::ErrorKind::UnexpectedEof => Some(Cow::Borrowed("UnexpectedEof")),
444            io::ErrorKind::OutOfMemory => Some(Cow::Borrowed("OutOfMemory")),
445            io::ErrorKind::Other => Some(Cow::Borrowed("Other")),
446            _ => None,
447        };
448
449        IoError {
450            kind,
451            message: err.get_ref().map(|err| err.to_string()),
452        }
453        .serialize(serializer)
454    }
455
456    pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
457    where
458        D: serde::Deserializer<'de>,
459    {
460        let io_err: IoError = <IoError as Deserialize>::deserialize(deserializer)?;
461        let kind = match io_err.kind.as_deref() {
462            Some("NotFound") => io::ErrorKind::NotFound,
463            Some("PermissionDenied") => io::ErrorKind::PermissionDenied,
464            Some("ConnectionRefused") => io::ErrorKind::ConnectionRefused,
465            Some("ConnectionReset") => io::ErrorKind::ConnectionReset,
466            Some("HostUnreachable") => io::ErrorKind::HostUnreachable,
467            Some("NetworkUnreachable") => io::ErrorKind::NetworkUnreachable,
468            Some("ConnectionAborted") => io::ErrorKind::ConnectionAborted,
469            Some("NotConnected") => io::ErrorKind::NotConnected,
470            Some("AddrInUse") => io::ErrorKind::AddrInUse,
471            Some("AddrNotAvailable") => io::ErrorKind::AddrNotAvailable,
472            Some("NetworkDown") => io::ErrorKind::NetworkDown,
473            Some("BrokenPipe") => io::ErrorKind::BrokenPipe,
474            Some("AlreadyExists") => io::ErrorKind::AlreadyExists,
475            Some("WouldBlock") => io::ErrorKind::WouldBlock,
476            Some("NotADirectory") => io::ErrorKind::NotADirectory,
477            Some("IsADirectory") => io::ErrorKind::IsADirectory,
478            Some("DirectoryNotEmpty") => io::ErrorKind::DirectoryNotEmpty,
479            Some("ReadOnlyFilesystem") => io::ErrorKind::ReadOnlyFilesystem,
480            Some("StaleNetworkFileHandle") => io::ErrorKind::StaleNetworkFileHandle,
481            Some("InvalidInput") => io::ErrorKind::InvalidInput,
482            Some("InvalidData") => io::ErrorKind::InvalidData,
483            Some("TimedOut") => io::ErrorKind::TimedOut,
484            Some("WriteZero") => io::ErrorKind::WriteZero,
485            Some("StorageFull") => io::ErrorKind::StorageFull,
486            Some("NotSeekable") => io::ErrorKind::NotSeekable,
487            Some("QuotaExceeded") => io::ErrorKind::QuotaExceeded,
488            Some("FileTooLarge") => io::ErrorKind::FileTooLarge,
489            Some("ResourceBusy") => io::ErrorKind::ResourceBusy,
490            Some("ExecutableFileBusy") => io::ErrorKind::ExecutableFileBusy,
491            Some("Deadlock") => io::ErrorKind::Deadlock,
492            Some("CrossesDevices") => io::ErrorKind::CrossesDevices,
493            Some("TooManyLinks") => io::ErrorKind::TooManyLinks,
494            Some("InvalidFilename") => io::ErrorKind::InvalidFilename,
495            Some("ArgumentListTooLong") => io::ErrorKind::ArgumentListTooLong,
496            Some("Interrupted") => io::ErrorKind::Interrupted,
497            Some("Unsupported") => io::ErrorKind::Unsupported,
498            Some("UnexpectedEof") => io::ErrorKind::UnexpectedEof,
499            Some("OutOfMemory") => io::ErrorKind::OutOfMemory,
500            Some("Other") => io::ErrorKind::Other,
501            _ => io::ErrorKind::Other,
502        };
503
504        Ok(io::Error::new(
505            kind,
506            io_err
507                .message
508                .unwrap_or_else(|| "unknown error".to_string()),
509        ))
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516
517    // A simple implementation of DCBReadResponseSync for testing
518    struct TestReadResponse {
519        events: Vec<DcbSequencedEvent>,
520        current_index: usize,
521        head_position: Option<u64>,
522    }
523
524    impl TestReadResponse {
525        fn new(events: Vec<DcbSequencedEvent>, head_position: Option<u64>) -> Self {
526            Self {
527                events,
528                current_index: 0,
529                head_position,
530            }
531        }
532    }
533
534    impl Iterator for TestReadResponse {
535        type Item = DcbResult<DcbSequencedEvent>;
536
537        fn next(&mut self) -> Option<Self::Item> {
538            if self.current_index < self.events.len() {
539                let event = self.events[self.current_index].clone();
540                self.current_index += 1;
541                Some(Ok(event))
542            } else {
543                None
544            }
545        }
546    }
547
548    impl DcbReadResponseSync for TestReadResponse {
549        fn head(&mut self) -> DcbResult<Option<u64>> {
550            Ok(self.head_position)
551        }
552
553        fn collect_with_head(&mut self) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
554            todo!()
555        }
556
557        fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
558            let mut batch = Vec::new();
559            while let Some(result) = self.next() {
560                match result {
561                    Ok(event) => batch.push(event),
562                    Err(err) => {
563                        panic!("{}", err);
564                    }
565                }
566            }
567            Ok(batch)
568        }
569    }
570
571    #[test]
572    fn test_dcb_read_response() {
573        // Create some test events
574        let event1 = DcbEvent {
575            event_type: "test_event".to_string(),
576            data: vec![1, 2, 3],
577            tags: vec!["tag1".to_string(), "tag2".to_string()],
578            uuid: None,
579        };
580
581        let event2 = DcbEvent {
582            event_type: "another_event".to_string(),
583            data: vec![4, 5, 6],
584            tags: vec!["tag2".to_string(), "tag3".to_string()],
585            uuid: None,
586        };
587
588        let seq_event1 = DcbSequencedEvent {
589            event: event1,
590            position: 1,
591        };
592
593        let seq_event2 = DcbSequencedEvent {
594            event: event2,
595            position: 2,
596        };
597
598        // Create a test response
599        let mut response =
600            TestReadResponse::new(vec![seq_event1.clone(), seq_event2.clone()], Some(2));
601
602        // Test head position
603        assert_eq!(response.head().unwrap(), Some(2));
604
605        // Test iterator functionality
606        assert_eq!(response.next().unwrap().unwrap().position, 1);
607        assert_eq!(response.next().unwrap().unwrap().position, 2);
608        assert!(response.next().is_none());
609    }
610
611    #[test]
612    fn test_event_new() {
613        let event1 = DcbEvent::default()
614            .event_type("type1")
615            .data(b"data1")
616            .tags(["tagX"]);
617
618        // println!("Event created with builder API:");
619        // println!("  event_type: {}", event1.event_type);
620        // println!("  data: {:?}", event1.data);
621        // println!("  tags: {:?}", event1.tags);
622        // println!("  uuid: {:?}", event1.uuid);
623
624        // Verify the fields match expectations
625        assert_eq!(event1.event_type, "type1");
626        assert_eq!(event1.data, b"data1".to_vec());
627        assert_eq!(event1.tags, vec!["tagX".to_string()]);
628        assert_eq!(event1.uuid, None);
629
630        // Test with multiple tags
631        let event2 = DcbEvent::default()
632            .event_type("type2")
633            .data(b"data2")
634            .tags(["tag1", "tag2", "tag3"]);
635        assert_eq!(event2.tags.len(), 3);
636
637        // Test without data or tags
638        let event3 = DcbEvent::default().event_type("type3");
639        assert_eq!(event3.data.len(), 0);
640        assert_eq!(event3.tags.len(), 0);
641
642        // Test DCBQueryItem builder
643        let query_item = DcbQueryItem::new()
644            .types(["type1", "type2"])
645            .tags(["tagA", "tagB"]);
646        assert_eq!(query_item.types.len(), 2);
647        assert_eq!(query_item.tags.len(), 2);
648
649        // Test DCBQuery builder
650        let query = DcbQuery::new().item(query_item);
651        assert_eq!(query.items.len(), 1);
652
653        println!("\nAll builder API tests passed!");
654    }
655}