1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use std::{ops::Deref, sync::Arc};

use parking_lot::Mutex;

/// A service that controls access to some state
///
/// `Service` is a generic wrapper around some state, as well as code that knows
/// how to operate on that state. It processes commands, changes the state based
/// on those command, and produces events that capture these changes. These
/// events are stored, providing a log of all changes to the state, and can be
/// replayed later to re-create the state at any point in time.
///
/// The wrapped state must implement [`State`], which defines the type of
/// command that this service processes, and the type of event that captures
/// state changes. It also defines methods that operate on the state, commands,
/// and events.
///
/// Implementations of [`State`] might also define an extension trait for a
/// specific `Service<MyState>`, to provide a convenient API to callers.
///
/// This design takes inspiration from, and uses the nomenclature of, this
/// article:
/// <https://thinkbeforecoding.com/post/2021/12/17/functional-event-sourcing-decider>
pub struct Service<S: State> {
    state: S,
    events: Vec<S::Event>,
    subscribers: Vec<Arc<Mutex<dyn Subscriber<S::Event>>>>,
}

impl<S: State> Service<S> {
    /// Create an instance of `Service`
    pub fn new(state: S) -> Self {
        Self {
            state,
            events: Vec::new(),
            subscribers: Vec::new(),
        }
    }

    /// Add a subscriber
    pub fn subscribe(
        &mut self,
        subscriber: Arc<Mutex<dyn Subscriber<S::Event>>>,
    ) {
        self.subscribers.push(subscriber);
    }

    /// Execute a command
    ///
    /// The command is executed synchronously. When this method returns, the
    /// state has been updated and any events have been logged.
    pub fn execute(&mut self, command: S::Command) {
        let mut events = Vec::new();
        self.state.decide(command, &mut events);

        for event in &events {
            self.state.evolve(event);

            for subscriber in &self.subscribers {
                let mut subscriber = subscriber.lock();
                subscriber.handle_event(event);
            }
        }

        self.events.extend(events);
    }

    /// Access the events
    pub fn events(&self) -> impl Iterator<Item = &S::Event> {
        self.events.iter()
    }

    /// Replay the provided events on the given state
    pub fn replay<'event>(
        state: &mut S,
        events: impl IntoIterator<Item = &'event S::Event>,
    ) where
        <S as State>::Event: 'event,
    {
        for event in events {
            state.evolve(event);
        }
    }
}

impl<S: State> Deref for Service<S> {
    type Target = S;

    fn deref(&self) -> &Self::Target {
        &self.state
    }
}

impl<S: State> Default for Service<S>
where
    S: Default,
{
    fn default() -> Self {
        Self::new(S::default())
    }
}

impl<S: State> Subscriber<S::Command> for Service<S> {
    fn handle_event(&mut self, event: &S::Command) {
        self.execute(event.clone());
    }
}

/// Implemented for state that can be wrapped by a [`Service`]
///
/// See [`Service`] for a detailed explanation.
pub trait State {
    /// A command that relates to the state
    ///
    /// Commands are processed by [`State::decide`].
    type Command: Clone;

    /// An event that captures modifications to this state
    ///
    /// Events are produced by [`State::decide`] and processed by
    /// [`State::evolve`].
    type Event: Clone;

    /// Decide how to react to the provided command
    ///
    /// If the command must result in changes to the state, any number of events
    /// that describe these state changes can be produced.
    fn decide(&self, command: Self::Command, events: &mut Vec<Self::Event>);

    /// Evolve the state according to the provided event
    ///
    /// This is the only method gets mutable access to the state, making sure
    /// that all changes to the state are captured as events.
    ///
    /// Implementations of this method are supposed to be relatively dumb. Any
    /// decisions that go into updating the state should be made in
    /// [`State::decide`], and encoded into the event.
    fn evolve(&mut self, event: &Self::Event);
}

pub trait Subscriber<T> {
    fn handle_event(&mut self, event: &T);
}