dendrite_lib/axon_utils/
mod.rs

1//! # Axon Utilities
2//!
3//! Module `axon_utils` exports items that can be used to create a complete Event Sourced CQRS
4//! application that uses [AxonServer](https://axoniq.io/product-overview/axon-server) as an event store.
5//! The whole back-end can be packaged as a single application, but when growing load or complexity
6//! demands it, aspects can be taken out and converted to microservices that can be scaled horizontally.
7//!
8//! The basic parts of a dendrite application are:
9//! * Command API — _accepts commands on a gRPC API and forwards them (again over gRPC to AxonServer)_
10//! * Command worker — _subscribes to commands, verifies them against the command projection and sends emitted events to AxonServer_
11//! * Event processor — _subscribes to events and builds a query model from them (there are likely to be multiple query models for a single application)_
12//! * Query API — _accepts queries on a gRPC API and forwards them (again over gRPC to AxonServer)_
13//! * Query processor — _subscribes to queries, executes them against a query model and pass back the results_
14
15use anyhow::{anyhow, Result};
16use async_channel::{bounded, Receiver, Sender};
17use futures_util::FutureExt;
18use futures_util::TryFutureExt;
19use log::{debug, error, info};
20use prost::Message;
21use std::collections::HashMap;
22use std::fmt::{Debug, Formatter};
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::{Arc, Mutex};
26use tokio::select;
27use tokio::signal::unix::Signal;
28use tokio::task::JoinHandle;
29use tonic::transport::Channel;
30use tonic::{Response, Status};
31use uuid::Uuid;
32
33mod command_submit;
34mod command_worker;
35mod connection;
36mod event_processor;
37mod event_query;
38mod handler_registry;
39mod query_processor;
40mod query_submit;
41
42use crate::axon_server::command::command_service_client::CommandServiceClient;
43use crate::axon_server::command::{Command, CommandResponse};
44pub use crate::axon_server::SerializedObject;
45use crate::axon_utils::handler_registry::PinFuture;
46use crate::axon_utils::WorkerCommand::Unsubscribe;
47pub use command_submit::init as init_command_sender;
48pub use command_submit::SubmitCommand;
49pub use command_worker::command_worker;
50pub use command_worker::{
51    create_aggregate_definition, emit, emit_events, emit_events_and_response,
52    empty_aggregate_registry, AggregateContext, AggregateContextTrait, AggregateDefinition,
53    AggregateRegistry, EmitApplicableEventsAndResponse, TheAggregateRegistry,
54};
55pub use connection::{platform_worker, platform_worker_for, wait_for_server};
56pub use event_processor::{event_processor, TokenStore};
57pub use event_query::query_events;
58pub use handler_registry::empty_handler_registry;
59pub use handler_registry::{HandleBuilder, HandlerRegistry, TheHandlerRegistry};
60pub use query_processor::{query_processor, QueryContext, QueryResult};
61
62pub type WorkerThread = Box<dyn FnOnce(AxonServerHandle, WorkerControl) -> PinFuture<()> + Sync>;
63
64struct WorkerRegistry {
65    workers: HashMap<Uuid, WorkerHandle>,
66    notifications: Receiver<Uuid>,
67}
68
69impl Debug for WorkerRegistry {
70    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71        f.write_str("[WorkerRegistry:")?;
72        for worker in self.workers.values() {
73            f.write_str(&format!("{:?}->{:?},", worker.id, worker.label))?;
74        }
75        f.write_str("]")?;
76        Ok(())
77    }
78}
79
80/// A handle for AxonServer.
81#[derive(Debug, Clone)]
82pub struct AxonServerHandle {
83    pub display_name: String,
84    pub client_id: String,
85    pub conn: Channel,
86    pub notify: Sender<Uuid>,
87    registry: Arc<Mutex<WorkerRegistry>>,
88}
89
90impl AxonServerHandle {
91    fn has_workers(&self) -> Result<bool> {
92        let registry = self.registry.lock();
93        let registry = registry.map_err(|e| anyhow!(e.to_string()))?;
94        debug!("Remaining workers: {:?}", &registry.workers);
95        Ok(!registry.workers.is_empty())
96    }
97
98    fn remove_worker(&self, id: &Uuid) -> Result<()> {
99        let mut registry = self.registry.lock();
100        let registry = registry.as_mut().map_err(|e| anyhow!(e.to_string()))?;
101        registry.workers.remove(id);
102        Ok(())
103    }
104
105    async fn get_stopped_worker(&self) -> Result<Uuid> {
106        let stopped_worker_receiver = self.get_stopped_worker_receiver()?;
107        stopped_worker_receiver
108            .recv()
109            .await
110            .map_err(|e| anyhow!(e.to_string()))
111    }
112
113    async fn get_stopped_worker_with_signal(
114        &self,
115        signal_option: &mut Option<Signal>,
116    ) -> Result<Uuid> {
117        match signal_option {
118            Some(signal) => {
119                let stopped_worker_receiver = self.get_stopped_worker_receiver()?;
120                select! {
121                    id = stopped_worker_receiver.recv() => id.map_err(|e| anyhow!(e.to_string())),
122                    _ = signal.recv() => Ok(Uuid::new_v4())
123                }
124            }
125            None => self.get_stopped_worker().await,
126        }
127    }
128
129    fn get_stopped_worker_receiver(&self) -> Result<Receiver<Uuid>> {
130        let registry = self.registry.lock();
131        let registry = registry.map_err(|e| anyhow!(e.to_string()))?;
132        Ok(registry.notifications.clone())
133    }
134}
135
136#[derive(Eq, PartialEq)]
137pub enum WorkerCommand {
138    Unsubscribe,
139    Stop,
140}
141
142pub struct WorkerHandle {
143    id: Uuid,
144    join_handle: Option<Pin<Box<dyn Future<Output = Result<()>> + Send>>>,
145    control_channel: Sender<WorkerCommand>,
146    label: String,
147}
148
149pub struct WorkerControl {
150    control_channel: Receiver<WorkerCommand>,
151    label: String,
152}
153
154impl WorkerControl {
155    pub fn get_label(&self) -> &str {
156        &*self.label
157    }
158    pub fn get_control_channel(&self) -> Receiver<WorkerCommand> {
159        self.control_channel.clone()
160    }
161}
162
163impl WorkerHandle {
164    pub fn get_id(&self) -> Uuid {
165        self.id
166    }
167    pub fn get_join_handle(
168        &mut self,
169    ) -> &mut Option<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
170        &mut self.join_handle
171    }
172    pub fn get_control_channel(&self) -> &Sender<WorkerCommand> {
173        &self.control_channel
174    }
175}
176
177impl Debug for WorkerHandle {
178    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
179        f.write_str("[WorkerHandle:")?;
180        self.id.fmt(f)?;
181        f.write_str(":")?;
182        self.label.fmt(f)?;
183        f.write_str("]")?;
184        Ok(())
185    }
186}
187
188pub trait IntoPinFuture {
189    fn into_pin_future(
190        self,
191        axon_serve_handle: &AxonServerHandle,
192        worker_control: WorkerControl,
193    ) -> PinFuture<()>;
194}
195
196impl<T, F> IntoPinFuture for &'static T
197where
198    T: Fn(AxonServerHandle, WorkerControl) -> F,
199    F: Future<Output = ()> + Send + 'static,
200{
201    fn into_pin_future(
202        self,
203        axon_serve_handle: &AxonServerHandle,
204        worker_control: WorkerControl,
205    ) -> PinFuture<()> {
206        Box::pin(self((*axon_serve_handle).clone(), worker_control))
207    }
208}
209
210impl<T, F> IntoPinFuture for Box<T>
211where
212    T: FnOnce(AxonServerHandle, WorkerControl) -> F,
213    F: Future<Output = ()> + Send + 'static,
214{
215    fn into_pin_future(
216        self,
217        axon_serve_handle: &AxonServerHandle,
218        worker_control: WorkerControl,
219    ) -> PinFuture<()> {
220        Box::pin(self((*axon_serve_handle).clone(), worker_control))
221    }
222}
223
224impl IntoPinFuture for WorkerThread {
225    fn into_pin_future(
226        self,
227        axon_serve_handle: &AxonServerHandle,
228        worker_control: WorkerControl,
229    ) -> PinFuture<()> {
230        (self)((*axon_serve_handle).clone(), worker_control).into()
231    }
232}
233
234impl AxonServerHandle {
235    #[deprecated]
236    pub fn spawn_ref<T, F, S: Into<String>>(&self, label: S, task: &'static T) -> Result<()>
237    where
238        T: Fn(AxonServerHandle, WorkerControl) -> F,
239        F: Future<Output = ()> + Send + 'static,
240    {
241        self.spawn(label, task)?;
242        Ok(())
243    }
244    pub fn spawn<S: Into<String>, T: IntoPinFuture>(&self, label: S, task: T) -> Result<Uuid> {
245        let label = label.into();
246        let notify = self.notify.clone();
247        let id = Uuid::new_v4();
248        let (tx, rx) = bounded(10);
249        let worker_control = WorkerControl {
250            control_channel: rx,
251            label: label.clone(),
252        };
253        let worker_future = task.into_pin_future(self, worker_control);
254        let join_handle = spawn_worker(worker_future, notify, id).map_err(Into::into);
255        let handle = WorkerHandle {
256            id,
257            join_handle: Some(Box::pin(join_handle)),
258            control_channel: tx,
259            label,
260        };
261        let mut registry = self.registry.lock();
262        let registry = registry.as_mut().map_err(|e| anyhow!(e.to_string()))?;
263        registry.workers.insert(id, handle);
264        Ok(id)
265    }
266}
267
268fn spawn_worker<T>(future: T, notify: Sender<Uuid>, id: Uuid) -> JoinHandle<T::Output>
269where
270    T: Future + Send + 'static,
271    T::Output: Send + 'static,
272{
273    tokio::spawn(future.then(move |result| async move {
274        if let Err(e) = notify.send(id.clone()).await {
275            debug!(
276                "Termination notification failed for worker: {:?}: {:?}",
277                id.clone(),
278                e
279            );
280        }
281        info!("Worker stopped: {:?}", id);
282        result
283    }))
284}
285
286pub trait AxonServerHandleTrait: Sync + AxonServerHandleAsyncTrait {
287    fn client_id(&self) -> &str;
288    fn display_name(&self) -> &str;
289}
290
291#[tonic::async_trait]
292pub trait AxonServerHandleAsyncTrait {
293    async fn dispatch(&self, request: Command) -> Result<Response<CommandResponse>, Status>;
294    async fn join_workers(&self) -> Result<()>;
295    async fn join_workers_with_signal(&self, terminate: &mut Option<Signal>) -> Result<()>;
296}
297
298impl AxonServerHandleTrait for AxonServerHandle {
299    fn client_id(&self) -> &str {
300        &self.client_id
301    }
302    fn display_name(&self) -> &str {
303        &self.display_name
304    }
305}
306
307#[tonic::async_trait]
308impl AxonServerHandleAsyncTrait for AxonServerHandle {
309    async fn dispatch(&self, request: Command) -> Result<Response<CommandResponse>, Status> {
310        let mut client = CommandServiceClient::new(self.conn.clone());
311        client.dispatch(request).await
312    }
313    async fn join_workers(&self) -> Result<()> {
314        let mut never: Option<Signal> = None;
315        self.join_workers_with_signal(&mut never).await
316    }
317    async fn join_workers_with_signal(&self, terminate: &mut Option<Signal>) -> Result<()> {
318        if !self.has_workers()? {
319            return Ok(());
320        }
321        let stopped_worker = self.get_stopped_worker_with_signal(terminate).await?;
322        self.remove_worker(&stopped_worker)?;
323        let senders = {
324            let mut registry = self.registry.lock();
325            let registry = registry.as_mut().map_err(|e| anyhow!(e.to_string()))?;
326            let mut pairs = Vec::new();
327            for worker in registry.workers.values() {
328                info!("Worker: {:?}: {:?}", &worker.id, &worker.label);
329                pairs.push((worker.id, worker.control_channel.clone()));
330            }
331            pairs
332        };
333        for (worker_id, sender) in senders {
334            info!("{:?}: Stopping", worker_id);
335            sender
336                .send(Unsubscribe)
337                .await
338                .map_err(|e| {
339                    error!(
340                        "Error while sending 'Unsubscribe': {:?}: {:?}",
341                        e, worker_id
342                    );
343                    ()
344                })
345                .ok();
346            sender
347                .send(WorkerCommand::Stop)
348                .await
349                .map_err(|e| {
350                    error!("Error while sending 'Stop': {:?}: {:?}", e, worker_id);
351                    ()
352                })
353                .ok();
354        }
355        while self.has_workers()? {
356            let stopped_worker = self.get_stopped_worker().await?;
357            self.remove_worker(&stopped_worker)?;
358        }
359        Ok(())
360    }
361}
362
363/// Describes a message that can be serialized to a mutable `Vec<u8>`.
364pub trait VecU8Message {
365    fn encode_u8(&self, buf: &mut Vec<u8>) -> Result<()>;
366}
367
368impl<T> VecU8Message for T
369where
370    T: Message + Sized,
371{
372    fn encode_u8(&self, buf: &mut Vec<u8>) -> Result<()> {
373        self.encode(buf).map_err(|e| {
374            anyhow!(
375                "Prost encode error: {:?}: {:?}",
376                e.required_capacity(),
377                e.remaining()
378            )
379        })
380    }
381}
382
383/// Trait that is implemented by an object that can be used to send commands to AxonServer.
384#[tonic::async_trait]
385pub trait CommandSink {
386    #[deprecated(since = "0.8.0", note = "Use struct `SubmitCommand` instead")]
387    async fn send_command(
388        &self,
389        command_type: &str,
390        command: &(dyn VecU8Message + Sync),
391    ) -> Result<Option<SerializedObject>>;
392}
393
394/// Trait that is implemented by an object that can be used to send queries to AxonServer.
395#[tonic::async_trait]
396pub trait QuerySink {
397    async fn send_query<'a>(
398        &self,
399        query_type: &str,
400        query: &(dyn VecU8Message + Sync),
401    ) -> Result<Vec<SerializedObject>>;
402}
403
404/// Converts a `prost::Message` to an Axon `SerializedObject`.
405pub fn axon_serialize<T: Message>(type_name: &str, message: &T) -> Result<SerializedObject> {
406    let mut buf = Vec::new();
407    message.encode(&mut buf)?;
408    let result = SerializedObject {
409        r#type: type_name.to_string(),
410        revision: "".to_string(),
411        data: buf,
412    };
413    debug!("Encoded output: {:?}", &result);
414    Ok(result)
415}
416
417/// Describes a `Message` that is applicable to a particular projection type.
418pub trait ApplicableTo<Projection, Metadata>
419where
420    Self: VecU8Message + Send + Sync + Debug,
421{
422    /// Applies this message to the given projection.
423    fn apply_to(self, metadata: Metadata, projection: &mut Projection) -> Result<()>; // the self type is implicit
424
425    /// Creates a box with a clone of this message.
426    fn box_clone(&self) -> Box<dyn ApplicableTo<Projection, Metadata>>;
427}
428
429/// Describes a `Message` that is asynchronously applicable to a particular projection type.
430#[tonic::async_trait]
431pub trait AsyncApplicableTo<Projection, Metadata>
432where
433    Self: VecU8Message + Send + Sync + Debug,
434{
435    /// Applies this message to the given projection.
436    async fn apply_to(self, metadata: Metadata, projection: &mut Projection) -> Result<()>;
437
438    fn box_clone(&self) -> Box<dyn AsyncApplicableTo<Projection, Metadata>>;
439}