1use anyhow::{anyhow, Result};
16use async_channel::{bounded, Receiver, Sender};
17use futures_util::FutureExt;
18use futures_util::TryFutureExt;
19use log::{debug, error, info};
20use prost::Message;
21use std::collections::HashMap;
22use std::fmt::{Debug, Formatter};
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::{Arc, Mutex};
26use tokio::select;
27use tokio::signal::unix::Signal;
28use tokio::task::JoinHandle;
29use tonic::transport::Channel;
30use tonic::{Response, Status};
31use uuid::Uuid;
32
33mod command_submit;
34mod command_worker;
35mod connection;
36mod event_processor;
37mod event_query;
38mod handler_registry;
39mod query_processor;
40mod query_submit;
41
42use crate::axon_server::command::command_service_client::CommandServiceClient;
43use crate::axon_server::command::{Command, CommandResponse};
44pub use crate::axon_server::SerializedObject;
45use crate::axon_utils::handler_registry::PinFuture;
46use crate::axon_utils::WorkerCommand::Unsubscribe;
47pub use command_submit::init as init_command_sender;
48pub use command_submit::SubmitCommand;
49pub use command_worker::command_worker;
50pub use command_worker::{
51 create_aggregate_definition, emit, emit_events, emit_events_and_response,
52 empty_aggregate_registry, AggregateContext, AggregateContextTrait, AggregateDefinition,
53 AggregateRegistry, EmitApplicableEventsAndResponse, TheAggregateRegistry,
54};
55pub use connection::{platform_worker, platform_worker_for, wait_for_server};
56pub use event_processor::{event_processor, TokenStore};
57pub use event_query::query_events;
58pub use handler_registry::empty_handler_registry;
59pub use handler_registry::{HandleBuilder, HandlerRegistry, TheHandlerRegistry};
60pub use query_processor::{query_processor, QueryContext, QueryResult};
61
62pub type WorkerThread = Box<dyn FnOnce(AxonServerHandle, WorkerControl) -> PinFuture<()> + Sync>;
63
64struct WorkerRegistry {
65 workers: HashMap<Uuid, WorkerHandle>,
66 notifications: Receiver<Uuid>,
67}
68
69impl Debug for WorkerRegistry {
70 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71 f.write_str("[WorkerRegistry:")?;
72 for worker in self.workers.values() {
73 f.write_str(&format!("{:?}->{:?},", worker.id, worker.label))?;
74 }
75 f.write_str("]")?;
76 Ok(())
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct AxonServerHandle {
83 pub display_name: String,
84 pub client_id: String,
85 pub conn: Channel,
86 pub notify: Sender<Uuid>,
87 registry: Arc<Mutex<WorkerRegistry>>,
88}
89
90impl AxonServerHandle {
91 fn has_workers(&self) -> Result<bool> {
92 let registry = self.registry.lock();
93 let registry = registry.map_err(|e| anyhow!(e.to_string()))?;
94 debug!("Remaining workers: {:?}", ®istry.workers);
95 Ok(!registry.workers.is_empty())
96 }
97
98 fn remove_worker(&self, id: &Uuid) -> Result<()> {
99 let mut registry = self.registry.lock();
100 let registry = registry.as_mut().map_err(|e| anyhow!(e.to_string()))?;
101 registry.workers.remove(id);
102 Ok(())
103 }
104
105 async fn get_stopped_worker(&self) -> Result<Uuid> {
106 let stopped_worker_receiver = self.get_stopped_worker_receiver()?;
107 stopped_worker_receiver
108 .recv()
109 .await
110 .map_err(|e| anyhow!(e.to_string()))
111 }
112
113 async fn get_stopped_worker_with_signal(
114 &self,
115 signal_option: &mut Option<Signal>,
116 ) -> Result<Uuid> {
117 match signal_option {
118 Some(signal) => {
119 let stopped_worker_receiver = self.get_stopped_worker_receiver()?;
120 select! {
121 id = stopped_worker_receiver.recv() => id.map_err(|e| anyhow!(e.to_string())),
122 _ = signal.recv() => Ok(Uuid::new_v4())
123 }
124 }
125 None => self.get_stopped_worker().await,
126 }
127 }
128
129 fn get_stopped_worker_receiver(&self) -> Result<Receiver<Uuid>> {
130 let registry = self.registry.lock();
131 let registry = registry.map_err(|e| anyhow!(e.to_string()))?;
132 Ok(registry.notifications.clone())
133 }
134}
135
136#[derive(Eq, PartialEq)]
137pub enum WorkerCommand {
138 Unsubscribe,
139 Stop,
140}
141
142pub struct WorkerHandle {
143 id: Uuid,
144 join_handle: Option<Pin<Box<dyn Future<Output = Result<()>> + Send>>>,
145 control_channel: Sender<WorkerCommand>,
146 label: String,
147}
148
149pub struct WorkerControl {
150 control_channel: Receiver<WorkerCommand>,
151 label: String,
152}
153
154impl WorkerControl {
155 pub fn get_label(&self) -> &str {
156 &*self.label
157 }
158 pub fn get_control_channel(&self) -> Receiver<WorkerCommand> {
159 self.control_channel.clone()
160 }
161}
162
163impl WorkerHandle {
164 pub fn get_id(&self) -> Uuid {
165 self.id
166 }
167 pub fn get_join_handle(
168 &mut self,
169 ) -> &mut Option<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
170 &mut self.join_handle
171 }
172 pub fn get_control_channel(&self) -> &Sender<WorkerCommand> {
173 &self.control_channel
174 }
175}
176
177impl Debug for WorkerHandle {
178 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
179 f.write_str("[WorkerHandle:")?;
180 self.id.fmt(f)?;
181 f.write_str(":")?;
182 self.label.fmt(f)?;
183 f.write_str("]")?;
184 Ok(())
185 }
186}
187
188pub trait IntoPinFuture {
189 fn into_pin_future(
190 self,
191 axon_serve_handle: &AxonServerHandle,
192 worker_control: WorkerControl,
193 ) -> PinFuture<()>;
194}
195
196impl<T, F> IntoPinFuture for &'static T
197where
198 T: Fn(AxonServerHandle, WorkerControl) -> F,
199 F: Future<Output = ()> + Send + 'static,
200{
201 fn into_pin_future(
202 self,
203 axon_serve_handle: &AxonServerHandle,
204 worker_control: WorkerControl,
205 ) -> PinFuture<()> {
206 Box::pin(self((*axon_serve_handle).clone(), worker_control))
207 }
208}
209
210impl<T, F> IntoPinFuture for Box<T>
211where
212 T: FnOnce(AxonServerHandle, WorkerControl) -> F,
213 F: Future<Output = ()> + Send + 'static,
214{
215 fn into_pin_future(
216 self,
217 axon_serve_handle: &AxonServerHandle,
218 worker_control: WorkerControl,
219 ) -> PinFuture<()> {
220 Box::pin(self((*axon_serve_handle).clone(), worker_control))
221 }
222}
223
224impl IntoPinFuture for WorkerThread {
225 fn into_pin_future(
226 self,
227 axon_serve_handle: &AxonServerHandle,
228 worker_control: WorkerControl,
229 ) -> PinFuture<()> {
230 (self)((*axon_serve_handle).clone(), worker_control).into()
231 }
232}
233
234impl AxonServerHandle {
235 #[deprecated]
236 pub fn spawn_ref<T, F, S: Into<String>>(&self, label: S, task: &'static T) -> Result<()>
237 where
238 T: Fn(AxonServerHandle, WorkerControl) -> F,
239 F: Future<Output = ()> + Send + 'static,
240 {
241 self.spawn(label, task)?;
242 Ok(())
243 }
244 pub fn spawn<S: Into<String>, T: IntoPinFuture>(&self, label: S, task: T) -> Result<Uuid> {
245 let label = label.into();
246 let notify = self.notify.clone();
247 let id = Uuid::new_v4();
248 let (tx, rx) = bounded(10);
249 let worker_control = WorkerControl {
250 control_channel: rx,
251 label: label.clone(),
252 };
253 let worker_future = task.into_pin_future(self, worker_control);
254 let join_handle = spawn_worker(worker_future, notify, id).map_err(Into::into);
255 let handle = WorkerHandle {
256 id,
257 join_handle: Some(Box::pin(join_handle)),
258 control_channel: tx,
259 label,
260 };
261 let mut registry = self.registry.lock();
262 let registry = registry.as_mut().map_err(|e| anyhow!(e.to_string()))?;
263 registry.workers.insert(id, handle);
264 Ok(id)
265 }
266}
267
268fn spawn_worker<T>(future: T, notify: Sender<Uuid>, id: Uuid) -> JoinHandle<T::Output>
269where
270 T: Future + Send + 'static,
271 T::Output: Send + 'static,
272{
273 tokio::spawn(future.then(move |result| async move {
274 if let Err(e) = notify.send(id.clone()).await {
275 debug!(
276 "Termination notification failed for worker: {:?}: {:?}",
277 id.clone(),
278 e
279 );
280 }
281 info!("Worker stopped: {:?}", id);
282 result
283 }))
284}
285
286pub trait AxonServerHandleTrait: Sync + AxonServerHandleAsyncTrait {
287 fn client_id(&self) -> &str;
288 fn display_name(&self) -> &str;
289}
290
291#[tonic::async_trait]
292pub trait AxonServerHandleAsyncTrait {
293 async fn dispatch(&self, request: Command) -> Result<Response<CommandResponse>, Status>;
294 async fn join_workers(&self) -> Result<()>;
295 async fn join_workers_with_signal(&self, terminate: &mut Option<Signal>) -> Result<()>;
296}
297
298impl AxonServerHandleTrait for AxonServerHandle {
299 fn client_id(&self) -> &str {
300 &self.client_id
301 }
302 fn display_name(&self) -> &str {
303 &self.display_name
304 }
305}
306
307#[tonic::async_trait]
308impl AxonServerHandleAsyncTrait for AxonServerHandle {
309 async fn dispatch(&self, request: Command) -> Result<Response<CommandResponse>, Status> {
310 let mut client = CommandServiceClient::new(self.conn.clone());
311 client.dispatch(request).await
312 }
313 async fn join_workers(&self) -> Result<()> {
314 let mut never: Option<Signal> = None;
315 self.join_workers_with_signal(&mut never).await
316 }
317 async fn join_workers_with_signal(&self, terminate: &mut Option<Signal>) -> Result<()> {
318 if !self.has_workers()? {
319 return Ok(());
320 }
321 let stopped_worker = self.get_stopped_worker_with_signal(terminate).await?;
322 self.remove_worker(&stopped_worker)?;
323 let senders = {
324 let mut registry = self.registry.lock();
325 let registry = registry.as_mut().map_err(|e| anyhow!(e.to_string()))?;
326 let mut pairs = Vec::new();
327 for worker in registry.workers.values() {
328 info!("Worker: {:?}: {:?}", &worker.id, &worker.label);
329 pairs.push((worker.id, worker.control_channel.clone()));
330 }
331 pairs
332 };
333 for (worker_id, sender) in senders {
334 info!("{:?}: Stopping", worker_id);
335 sender
336 .send(Unsubscribe)
337 .await
338 .map_err(|e| {
339 error!(
340 "Error while sending 'Unsubscribe': {:?}: {:?}",
341 e, worker_id
342 );
343 ()
344 })
345 .ok();
346 sender
347 .send(WorkerCommand::Stop)
348 .await
349 .map_err(|e| {
350 error!("Error while sending 'Stop': {:?}: {:?}", e, worker_id);
351 ()
352 })
353 .ok();
354 }
355 while self.has_workers()? {
356 let stopped_worker = self.get_stopped_worker().await?;
357 self.remove_worker(&stopped_worker)?;
358 }
359 Ok(())
360 }
361}
362
363pub trait VecU8Message {
365 fn encode_u8(&self, buf: &mut Vec<u8>) -> Result<()>;
366}
367
368impl<T> VecU8Message for T
369where
370 T: Message + Sized,
371{
372 fn encode_u8(&self, buf: &mut Vec<u8>) -> Result<()> {
373 self.encode(buf).map_err(|e| {
374 anyhow!(
375 "Prost encode error: {:?}: {:?}",
376 e.required_capacity(),
377 e.remaining()
378 )
379 })
380 }
381}
382
383#[tonic::async_trait]
385pub trait CommandSink {
386 #[deprecated(since = "0.8.0", note = "Use struct `SubmitCommand` instead")]
387 async fn send_command(
388 &self,
389 command_type: &str,
390 command: &(dyn VecU8Message + Sync),
391 ) -> Result<Option<SerializedObject>>;
392}
393
394#[tonic::async_trait]
396pub trait QuerySink {
397 async fn send_query<'a>(
398 &self,
399 query_type: &str,
400 query: &(dyn VecU8Message + Sync),
401 ) -> Result<Vec<SerializedObject>>;
402}
403
404pub fn axon_serialize<T: Message>(type_name: &str, message: &T) -> Result<SerializedObject> {
406 let mut buf = Vec::new();
407 message.encode(&mut buf)?;
408 let result = SerializedObject {
409 r#type: type_name.to_string(),
410 revision: "".to_string(),
411 data: buf,
412 };
413 debug!("Encoded output: {:?}", &result);
414 Ok(result)
415}
416
417pub trait ApplicableTo<Projection, Metadata>
419where
420 Self: VecU8Message + Send + Sync + Debug,
421{
422 fn apply_to(self, metadata: Metadata, projection: &mut Projection) -> Result<()>; fn box_clone(&self) -> Box<dyn ApplicableTo<Projection, Metadata>>;
427}
428
429#[tonic::async_trait]
431pub trait AsyncApplicableTo<Projection, Metadata>
432where
433 Self: VecU8Message + Send + Sync + Debug,
434{
435 async fn apply_to(self, metadata: Metadata, projection: &mut Projection) -> Result<()>;
437
438 fn box_clone(&self) -> Box<dyn AsyncApplicableTo<Projection, Metadata>>;
439}