Skip to main content

eventcore_types/
command.rs

1use std::collections::HashSet;
2
3use serde::{Serialize, de::DeserializeOwned};
4use thiserror::Error;
5
6use crate::errors::CommandError;
7use crate::store::StreamId;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct StreamDeclarations {
11    streams: Vec<StreamId>,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq, Error)]
15pub enum StreamDeclarationsError {
16    #[error("commands must declare at least one stream")]
17    Empty,
18    #[error("duplicate stream declared: {duplicate:?}")]
19    DuplicateStream { duplicate: StreamId },
20}
21
22impl StreamDeclarations {
23    pub fn try_from_streams<I>(streams: I) -> Result<Self, StreamDeclarationsError>
24    where
25        I: IntoIterator<Item = StreamId>,
26    {
27        let mut seen = HashSet::new();
28        let mut collected = Vec::new();
29
30        for stream in streams.into_iter() {
31            if !seen.insert(stream.clone()) {
32                return Err(StreamDeclarationsError::DuplicateStream { duplicate: stream });
33            }
34
35            collected.push(stream);
36        }
37
38        if collected.is_empty() {
39            return Err(StreamDeclarationsError::Empty);
40        }
41
42        Ok(Self { streams: collected })
43    }
44
45    pub fn single(stream: StreamId) -> Self {
46        Self {
47            streams: vec![stream],
48        }
49    }
50
51    pub fn with_participant(self, participant: StreamId) -> Result<Self, StreamDeclarationsError> {
52        let mut streams = self.streams;
53        streams.push(participant);
54        Self::try_from_streams(streams)
55    }
56
57    /// Returns true if the declaration contains no streams.
58    ///
59    /// Note: Valid instances constructed via `single()`, `try_from_streams()`, or
60    /// `with_participant()` are never empty. This method exists for API completeness
61    /// (e.g., to satisfy `clippy::len_without_is_empty`).
62    #[must_use]
63    pub fn is_empty(&self) -> bool {
64        self.streams.is_empty()
65    }
66
67    #[must_use]
68    pub fn len(&self) -> usize {
69        self.streams.len()
70    }
71
72    pub fn iter(&self) -> impl Iterator<Item = &StreamId> {
73        self.streams.iter()
74    }
75}
76
77/// Infrastructure trait describing the streams required to execute a command.
78///
79/// Per ADR-006, stream declarations are generated or implemented separately from
80/// the business logic so infrastructure can evolve independently. Commands
81/// typically use [`StreamDeclarations::single`] for single-stream workflows or
82/// [`StreamDeclarations::try_from_streams`] when coordinating multiple streams.
83pub trait CommandStreams {
84    fn stream_declarations(&self) -> StreamDeclarations;
85}
86
87/// Trait for runtime stream discovery when static declarations are insufficient.
88///
89/// Commands implement this trait when related streams cannot be known at compile
90/// time (for example, when a customer ID needs to be resolved from reconstructed
91/// state). Executors call this hook after reading declared streams so commands
92/// can request additional stream IDs to load before running business logic.
93///
94/// Implementations should return unique stream IDs; the executor deduplicates
95/// defensively but redundant IDs still waste I/O. Streams listed here are folded
96/// into the state reconstruction pass and participate in optimistic concurrency
97/// along with the statically declared streams.
98pub trait StreamResolver<State> {
99    /// Discovers additional stream IDs to load based on reconstructed state.
100    fn discover_related_streams(&self, state: &State) -> Vec<StreamId>;
101}
102
103/// Event trait for domain-first event sourcing.
104///
105/// Per ADR-012, domain types implement this trait to become events. The trait provides
106/// the minimal infrastructure contract: events must know their stream identity
107/// (aggregate ID) and support necessary operations for storage and async handling.
108///
109/// # Trait Bounds
110///
111/// * `Clone` - Required for state reconstruction (apply method may need events multiple times)
112/// * `Send` - Required for async storage backends and cross-thread event handling
113/// * `'static` - Required for type erasure in storage and async trait boundaries
114pub trait Event: Clone + Send + Serialize + DeserializeOwned + 'static {
115    /// Returns the stream this event belongs to.
116    ///
117    /// The stream ID represents the aggregate identity in Domain-Driven Design.
118    /// Each domain event knows which aggregate instance it belongs to.
119    fn stream_id(&self) -> &StreamId;
120}
121
122/// Trait defining the business logic of a command.
123///
124/// Commands encapsulate business operations that read from event streams,
125/// reconstruct state, validate business rules, and produce events.
126///
127/// Stream declarations are provided separately via [`CommandStreams`] so that
128/// infrastructure (such as proc-macros defined in ADR-006) can evolve
129/// independently while this trait focuses purely on domain behavior.
130///
131/// Per ADR-012, commands use an associated type for their event type rather than
132/// a generic parameter, providing better type inference and cleaner APIs.
133///
134/// # Associated Types
135///
136/// * `Event` - The domain event type implementing the Event trait
137/// * `State` - The state type reconstructed from events via `apply()`
138pub trait CommandLogic: CommandStreams {
139    /// The domain event type this command produces.
140    ///
141    /// Must implement the Event trait to provide stream identity and
142    /// required infrastructure capabilities.
143    type Event: Event;
144
145    /// The state type accumulated from event history.
146    ///
147    /// This type represents the reconstructed state needed to validate
148    /// business rules and produce events. It's rebuilt from scratch for
149    /// each command execution by applying events via `apply()`.
150    type State: Default;
151
152    /// Reconstruct state by applying a single event.
153    ///
154    /// This method is called once per event in the stream(s) to rebuild
155    /// the complete state needed for command execution. It implements the
156    /// left-fold pattern: `events.fold(State::default(), apply)`.
157    ///
158    /// # Parameters
159    ///
160    /// * `state` - The accumulated state so far
161    /// * `event` - The next event to apply (borrowed reference)
162    ///
163    /// # Returns
164    ///
165    /// The updated state after applying the event
166    fn apply(&self, state: Self::State, event: &Self::Event) -> Self::State;
167
168    /// Execute business logic and produce events.
169    ///
170    /// This method validates business rules using the reconstructed state
171    /// and returns events to be persisted. It's a pure function that
172    /// makes domain decisions without performing I/O or side effects.
173    ///
174    /// # Parameters
175    ///
176    /// * `state` - The reconstructed state from all events
177    ///
178    /// # Returns
179    ///
180    /// * `Ok(NewEvents<Self::Event>)` if business rules pass and events produced
181    /// * `Err(CommandError)` if business rules violated
182    fn handle(&self, state: Self::State) -> Result<NewEvents<Self::Event>, CommandError>;
183
184    /// Returns a runtime stream resolver when the command needs dynamic discovery.
185    ///
186    /// Commands that implement [`StreamResolver`] can return `Some(self)` or a
187    /// dedicated resolver type so the executor loads additional streams after
188    /// reconstructing state. The default implementation returns `None`, meaning
189    /// the command relies solely on static [`CommandStreams`] declarations.
190    fn stream_resolver(&self) -> Option<&(dyn StreamResolver<Self::State> + Sync)> {
191        None
192    }
193}
194
195/// Collection of new events produced by a command.
196///
197/// This type represents the output of `CommandLogic::handle()` - the
198/// events that should be persisted as a result of command execution.
199///
200/// Per ADR-012, this works with domain event types that implement the Event trait.
201pub struct NewEvents<E: Event> {
202    events: Vec<E>,
203}
204
205impl<E: Event> From<Vec<E>> for NewEvents<E> {
206    fn from(events: Vec<E>) -> Self {
207        Self { events }
208    }
209}
210
211impl<E: Event> From<NewEvents<E>> for Vec<E> {
212    fn from(new_events: NewEvents<E>) -> Self {
213        new_events.events
214    }
215}
216
217impl<E: Event> Default for NewEvents<E> {
218    fn default() -> Self {
219        Self { events: Vec::new() }
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    fn stream(id: &str) -> StreamId {
228        StreamId::try_new(id.to_owned()).expect("valid stream id")
229    }
230
231    #[test]
232    fn try_from_streams_succeeds_with_unique_streams() {
233        let result = StreamDeclarations::try_from_streams(vec![
234            stream("accounts::primary"),
235            stream("accounts::secondary"),
236        ]);
237
238        assert!(result.is_ok());
239    }
240
241    #[test]
242    fn try_from_streams_rejects_empty_collections() {
243        let result = StreamDeclarations::try_from_streams(Vec::new());
244
245        assert_eq!(Err(StreamDeclarationsError::Empty), result);
246    }
247
248    #[test]
249    fn try_from_streams_rejects_duplicate_streams() {
250        let duplicate = stream("accounts::primary");
251        let result =
252            StreamDeclarations::try_from_streams(vec![duplicate.clone(), duplicate.clone()]);
253
254        assert_eq!(
255            Err(StreamDeclarationsError::DuplicateStream {
256                duplicate: duplicate.clone(),
257            }),
258            result,
259        );
260    }
261
262    #[test]
263    fn with_participant_rejects_duplicate_streams() {
264        let existing = stream("accounts::primary");
265        let streams = StreamDeclarations::single(existing.clone());
266        let result = streams.with_participant(existing.clone());
267
268        assert_eq!(
269            Err(StreamDeclarationsError::DuplicateStream {
270                duplicate: existing,
271            }),
272            result,
273        );
274    }
275
276    #[test]
277    fn len_returns_number_of_declared_streams() {
278        let streams = StreamDeclarations::try_from_streams(vec![
279            stream("accounts::primary"),
280            stream("audit::shadow"),
281        ])
282        .expect("multi-stream declaration should succeed");
283
284        assert_eq!(2, streams.len());
285    }
286
287    #[test]
288    fn is_empty_returns_true_for_empty_construction() {
289        let result = StreamDeclarations::try_from_streams(Vec::<StreamId>::new());
290
291        assert!(matches!(result, Err(StreamDeclarationsError::Empty)));
292    }
293
294    #[test]
295    fn is_empty_returns_false_for_single_stream() {
296        let streams = StreamDeclarations::single(stream("accounts::primary"));
297
298        assert!(!streams.is_empty());
299    }
300
301    #[test]
302    fn is_empty_returns_false_for_multi_stream() {
303        let streams = StreamDeclarations::try_from_streams(vec![
304            stream("accounts::primary"),
305            stream("audit::shadow"),
306        ])
307        .expect("multi-stream declaration should succeed");
308
309        assert!(!streams.is_empty());
310    }
311
312    #[test]
313    fn stream_declarations_len_and_is_empty_consistency() {
314        let primary = stream("accounts::primary");
315        let secondary = stream("audit::shadow");
316
317        let single = StreamDeclarations::single(primary.clone());
318        let multi = StreamDeclarations::try_from_streams(vec![primary, secondary])
319            .expect("multi-stream declaration should succeed");
320        let empty_error = StreamDeclarations::try_from_streams(Vec::<StreamId>::new())
321            .expect_err("empty set rejected");
322
323        let observed = (
324            single.len(),
325            single.is_empty(),
326            multi.len(),
327            multi.is_empty(),
328            matches!(empty_error, StreamDeclarationsError::Empty),
329        );
330
331        assert_eq!(observed, (1, false, 2, false, true));
332    }
333
334    #[test]
335    fn iter_yields_declared_streams() {
336        let primary = stream("accounts::primary");
337        let secondary = stream("audit::shadow");
338        let declarations =
339            StreamDeclarations::try_from_streams(vec![primary.clone(), secondary.clone()])
340                .expect("multi-stream declaration should succeed");
341
342        let collected: Vec<&StreamId> = declarations.iter().collect();
343
344        assert_eq!(collected, vec![&primary, &secondary]);
345    }
346}