eventcore_types/command.rs
1use std::collections::HashSet;
2
3use serde::{Serialize, de::DeserializeOwned};
4use thiserror::Error;
5
6use crate::errors::CommandError;
7use crate::store::StreamId;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct StreamDeclarations {
11 streams: Vec<StreamId>,
12}
13
14#[derive(Debug, Clone, PartialEq, Eq, Error)]
15pub enum StreamDeclarationsError {
16 #[error("commands must declare at least one stream")]
17 Empty,
18 #[error("duplicate stream declared: {duplicate:?}")]
19 DuplicateStream { duplicate: StreamId },
20}
21
22impl StreamDeclarations {
23 pub fn try_from_streams<I>(streams: I) -> Result<Self, StreamDeclarationsError>
24 where
25 I: IntoIterator<Item = StreamId>,
26 {
27 let mut seen = HashSet::new();
28 let mut collected = Vec::new();
29
30 for stream in streams.into_iter() {
31 if !seen.insert(stream.clone()) {
32 return Err(StreamDeclarationsError::DuplicateStream { duplicate: stream });
33 }
34
35 collected.push(stream);
36 }
37
38 if collected.is_empty() {
39 return Err(StreamDeclarationsError::Empty);
40 }
41
42 Ok(Self { streams: collected })
43 }
44
45 pub fn single(stream: StreamId) -> Self {
46 Self {
47 streams: vec![stream],
48 }
49 }
50
51 pub fn with_participant(self, participant: StreamId) -> Result<Self, StreamDeclarationsError> {
52 let mut streams = self.streams;
53 streams.push(participant);
54 Self::try_from_streams(streams)
55 }
56
57 /// Returns true if the declaration contains no streams.
58 ///
59 /// Note: Valid instances constructed via `single()`, `try_from_streams()`, or
60 /// `with_participant()` are never empty. This method exists for API completeness
61 /// (e.g., to satisfy `clippy::len_without_is_empty`).
62 #[must_use]
63 pub fn is_empty(&self) -> bool {
64 self.streams.is_empty()
65 }
66
67 #[must_use]
68 pub fn len(&self) -> usize {
69 self.streams.len()
70 }
71
72 pub fn iter(&self) -> impl Iterator<Item = &StreamId> {
73 self.streams.iter()
74 }
75}
76
77/// Infrastructure trait describing the streams required to execute a command.
78///
79/// Per ADR-006, stream declarations are generated or implemented separately from
80/// the business logic so infrastructure can evolve independently. Commands
81/// typically use [`StreamDeclarations::single`] for single-stream workflows or
82/// [`StreamDeclarations::try_from_streams`] when coordinating multiple streams.
83pub trait CommandStreams {
84 fn stream_declarations(&self) -> StreamDeclarations;
85}
86
87/// Trait for runtime stream discovery when static declarations are insufficient.
88///
89/// Commands implement this trait when related streams cannot be known at compile
90/// time (for example, when a customer ID needs to be resolved from reconstructed
91/// state). Executors call this hook after reading declared streams so commands
92/// can request additional stream IDs to load before running business logic.
93///
94/// Implementations should return unique stream IDs; the executor deduplicates
95/// defensively but redundant IDs still waste I/O. Streams listed here are folded
96/// into the state reconstruction pass and participate in optimistic concurrency
97/// along with the statically declared streams.
98pub trait StreamResolver<State> {
99 /// Discovers additional stream IDs to load based on reconstructed state.
100 fn discover_related_streams(&self, state: &State) -> Vec<StreamId>;
101}
102
103/// Event trait for domain-first event sourcing.
104///
105/// Per ADR-012, domain types implement this trait to become events. The trait provides
106/// the minimal infrastructure contract: events must know their stream identity
107/// (aggregate ID) and support necessary operations for storage and async handling.
108///
109/// # Trait Bounds
110///
111/// * `Clone` - Required for state reconstruction (apply method may need events multiple times)
112/// * `Send` - Required for async storage backends and cross-thread event handling
113/// * `'static` - Required for type erasure in storage and async trait boundaries
114pub trait Event: Clone + Send + Serialize + DeserializeOwned + 'static {
115 /// Returns the stream this event belongs to.
116 ///
117 /// The stream ID represents the aggregate identity in Domain-Driven Design.
118 /// Each domain event knows which aggregate instance it belongs to.
119 fn stream_id(&self) -> &StreamId;
120}
121
122/// Trait defining the business logic of a command.
123///
124/// Commands encapsulate business operations that read from event streams,
125/// reconstruct state, validate business rules, and produce events.
126///
127/// Stream declarations are provided separately via [`CommandStreams`] so that
128/// infrastructure (such as proc-macros defined in ADR-006) can evolve
129/// independently while this trait focuses purely on domain behavior.
130///
131/// Per ADR-012, commands use an associated type for their event type rather than
132/// a generic parameter, providing better type inference and cleaner APIs.
133///
134/// # Associated Types
135///
136/// * `Event` - The domain event type implementing the Event trait
137/// * `State` - The state type reconstructed from events via `apply()`
138pub trait CommandLogic: CommandStreams {
139 /// The domain event type this command produces.
140 ///
141 /// Must implement the Event trait to provide stream identity and
142 /// required infrastructure capabilities.
143 type Event: Event;
144
145 /// The state type accumulated from event history.
146 ///
147 /// This type represents the reconstructed state needed to validate
148 /// business rules and produce events. It's rebuilt from scratch for
149 /// each command execution by applying events via `apply()`.
150 type State: Default;
151
152 /// Reconstruct state by applying a single event.
153 ///
154 /// This method is called once per event in the stream(s) to rebuild
155 /// the complete state needed for command execution. It implements the
156 /// left-fold pattern: `events.fold(State::default(), apply)`.
157 ///
158 /// # Parameters
159 ///
160 /// * `state` - The accumulated state so far
161 /// * `event` - The next event to apply (borrowed reference)
162 ///
163 /// # Returns
164 ///
165 /// The updated state after applying the event
166 fn apply(&self, state: Self::State, event: &Self::Event) -> Self::State;
167
168 /// Execute business logic and produce events.
169 ///
170 /// This method validates business rules using the reconstructed state
171 /// and returns events to be persisted. It's a pure function that
172 /// makes domain decisions without performing I/O or side effects.
173 ///
174 /// # Parameters
175 ///
176 /// * `state` - The reconstructed state from all events
177 ///
178 /// # Returns
179 ///
180 /// * `Ok(NewEvents<Self::Event>)` if business rules pass and events produced
181 /// * `Err(CommandError)` if business rules violated
182 fn handle(&self, state: Self::State) -> Result<NewEvents<Self::Event>, CommandError>;
183
184 /// Returns a runtime stream resolver when the command needs dynamic discovery.
185 ///
186 /// Commands that implement [`StreamResolver`] can return `Some(self)` or a
187 /// dedicated resolver type so the executor loads additional streams after
188 /// reconstructing state. The default implementation returns `None`, meaning
189 /// the command relies solely on static [`CommandStreams`] declarations.
190 fn stream_resolver(&self) -> Option<&(dyn StreamResolver<Self::State> + Sync)> {
191 None
192 }
193}
194
195/// Collection of new events produced by a command.
196///
197/// This type represents the output of `CommandLogic::handle()` - the
198/// events that should be persisted as a result of command execution.
199///
200/// Per ADR-012, this works with domain event types that implement the Event trait.
201pub struct NewEvents<E: Event> {
202 events: Vec<E>,
203}
204
205impl<E: Event> From<Vec<E>> for NewEvents<E> {
206 fn from(events: Vec<E>) -> Self {
207 Self { events }
208 }
209}
210
211impl<E: Event> From<NewEvents<E>> for Vec<E> {
212 fn from(new_events: NewEvents<E>) -> Self {
213 new_events.events
214 }
215}
216
217impl<E: Event> Default for NewEvents<E> {
218 fn default() -> Self {
219 Self { events: Vec::new() }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 fn stream(id: &str) -> StreamId {
228 StreamId::try_new(id.to_owned()).expect("valid stream id")
229 }
230
231 #[test]
232 fn try_from_streams_succeeds_with_unique_streams() {
233 let result = StreamDeclarations::try_from_streams(vec![
234 stream("accounts::primary"),
235 stream("accounts::secondary"),
236 ]);
237
238 assert!(result.is_ok());
239 }
240
241 #[test]
242 fn try_from_streams_rejects_empty_collections() {
243 let result = StreamDeclarations::try_from_streams(Vec::new());
244
245 assert_eq!(Err(StreamDeclarationsError::Empty), result);
246 }
247
248 #[test]
249 fn try_from_streams_rejects_duplicate_streams() {
250 let duplicate = stream("accounts::primary");
251 let result =
252 StreamDeclarations::try_from_streams(vec![duplicate.clone(), duplicate.clone()]);
253
254 assert_eq!(
255 Err(StreamDeclarationsError::DuplicateStream {
256 duplicate: duplicate.clone(),
257 }),
258 result,
259 );
260 }
261
262 #[test]
263 fn with_participant_rejects_duplicate_streams() {
264 let existing = stream("accounts::primary");
265 let streams = StreamDeclarations::single(existing.clone());
266 let result = streams.with_participant(existing.clone());
267
268 assert_eq!(
269 Err(StreamDeclarationsError::DuplicateStream {
270 duplicate: existing,
271 }),
272 result,
273 );
274 }
275
276 #[test]
277 fn len_returns_number_of_declared_streams() {
278 let streams = StreamDeclarations::try_from_streams(vec![
279 stream("accounts::primary"),
280 stream("audit::shadow"),
281 ])
282 .expect("multi-stream declaration should succeed");
283
284 assert_eq!(2, streams.len());
285 }
286
287 #[test]
288 fn is_empty_returns_true_for_empty_construction() {
289 let result = StreamDeclarations::try_from_streams(Vec::<StreamId>::new());
290
291 assert!(matches!(result, Err(StreamDeclarationsError::Empty)));
292 }
293
294 #[test]
295 fn is_empty_returns_false_for_single_stream() {
296 let streams = StreamDeclarations::single(stream("accounts::primary"));
297
298 assert!(!streams.is_empty());
299 }
300
301 #[test]
302 fn is_empty_returns_false_for_multi_stream() {
303 let streams = StreamDeclarations::try_from_streams(vec![
304 stream("accounts::primary"),
305 stream("audit::shadow"),
306 ])
307 .expect("multi-stream declaration should succeed");
308
309 assert!(!streams.is_empty());
310 }
311
312 #[test]
313 fn stream_declarations_len_and_is_empty_consistency() {
314 let primary = stream("accounts::primary");
315 let secondary = stream("audit::shadow");
316
317 let single = StreamDeclarations::single(primary.clone());
318 let multi = StreamDeclarations::try_from_streams(vec![primary, secondary])
319 .expect("multi-stream declaration should succeed");
320 let empty_error = StreamDeclarations::try_from_streams(Vec::<StreamId>::new())
321 .expect_err("empty set rejected");
322
323 let observed = (
324 single.len(),
325 single.is_empty(),
326 multi.len(),
327 multi.is_empty(),
328 matches!(empty_error, StreamDeclarationsError::Empty),
329 );
330
331 assert_eq!(observed, (1, false, 2, false, true));
332 }
333
334 #[test]
335 fn iter_yields_declared_streams() {
336 let primary = stream("accounts::primary");
337 let secondary = stream("audit::shadow");
338 let declarations =
339 StreamDeclarations::try_from_streams(vec![primary.clone(), secondary.clone()])
340 .expect("multi-stream declaration should succeed");
341
342 let collected: Vec<&StreamId> = declarations.iter().collect();
343
344 assert_eq!(collected, vec![&primary, &secondary]);
345 }
346}