1use std::any::Any;
21use std::convert::Infallible;
22use std::fmt;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use std::sync::{Arc, Weak};
25use std::time::Instant;
26
27use crate::quickwit_common::metrics::IntCounter;
28use async_trait::async_trait;
29use tokio::sync::oneshot;
30
31use crate::channel_with_priority::{Receiver, Sender, TrySendError};
32use crate::envelope::{wrap_in_envelope, Envelope};
33use crate::scheduler::SchedulerClient;
34use crate::{
35 Actor, ActorContext, ActorExitStatus, AskError, DeferableReplyHandler, Handler, QueueCapacity,
36 RecvError, SendError,
37};
38
39pub struct Mailbox<A: Actor> {
56 inner: Arc<Inner<A>>,
57 ref_count: Arc<AtomicUsize>,
61}
62
63impl<A: Actor> Mailbox<A> {
64 pub fn downgrade(&self) -> WeakMailbox<A> {
65 WeakMailbox {
66 inner: Arc::downgrade(&self.inner),
67 ref_count: Arc::downgrade(&self.ref_count),
68 }
69 }
70}
71
72impl<A: Actor> Drop for Mailbox<A> {
73 fn drop(&mut self) {
74 let old_val = self.ref_count.fetch_sub(1, Ordering::SeqCst);
75 if old_val == 2 {
76 let _ = self.send_message_with_high_priority(LastMailbox);
80 }
81 }
82}
83
84#[derive(Debug)]
85struct LastMailbox;
86
87#[async_trait]
88impl<A: Actor> Handler<LastMailbox> for A {
89 type Reply = ();
90
91 async fn handle(
92 &mut self,
93 _: LastMailbox,
94 _ctx: &ActorContext<Self>,
95 ) -> Result<(), ActorExitStatus> {
96 Ok(())
110 }
111}
112
113#[derive(Copy, Clone)]
114pub(crate) enum Priority {
115 High,
116 Low,
117}
118
119impl<A: Actor> Clone for Mailbox<A> {
120 fn clone(&self) -> Self {
121 self.ref_count.fetch_add(1, Ordering::SeqCst);
122 Mailbox {
123 inner: self.inner.clone(),
124 ref_count: self.ref_count.clone(),
125 }
126 }
127}
128
129impl<A: Actor> Mailbox<A> {
130 pub(crate) fn is_last_mailbox(&self) -> bool {
131 self.ref_count.load(Ordering::SeqCst) == 1
132 }
133
134 pub fn id(&self) -> &str {
135 &self.inner.instance_id
136 }
137
138 pub(crate) fn scheduler_client(&self) -> Option<&SchedulerClient> {
139 self.inner.scheduler_client_opt.as_ref()
140 }
141}
142
143struct Inner<A: Actor> {
144 pub(crate) tx: Sender<Envelope<A>>,
145 scheduler_client_opt: Option<SchedulerClient>,
146 instance_id: String,
147}
148
149impl<A: Actor> fmt::Debug for Mailbox<A> {
150 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
151 f.debug_tuple("Mailbox")
152 .field(&self.actor_instance_id())
153 .finish()
154 }
155}
156
157impl<A: Actor> Mailbox<A> {
158 pub fn actor_instance_id(&self) -> &str {
159 &self.inner.instance_id
160 }
161
162 pub fn is_disconnected(&self) -> bool {
163 self.inner.tx.is_disconnected()
164 }
165
166 pub async fn send_message<M>(
172 &self,
173 message: M,
174 ) -> Result<oneshot::Receiver<A::Reply>, SendError>
175 where
176 A: DeferableReplyHandler<M>,
177 M: 'static + Send + Sync + fmt::Debug,
178 {
179 self.send_message_with_backpressure_counter(message, None)
180 .await
181 }
182
183 pub fn try_send_message<M>(
187 &self,
188 message: M,
189 ) -> Result<oneshot::Receiver<A::Reply>, TrySendError<M>>
190 where
191 A: DeferableReplyHandler<M>,
192 M: 'static + Send + Sync + fmt::Debug,
193 {
194 let (envelope, response_rx) = self.wrap_in_envelope(message);
195 self.inner
196 .tx
197 .try_send_low_priority(envelope)
198 .map_err(|err| {
199 match err {
200 TrySendError::Disconnected => TrySendError::Disconnected,
201 TrySendError::Full(mut envelope) => {
202 let message: M = envelope.message_typed().unwrap();
204 TrySendError::Full(message)
205 }
206 }
207 })?;
208 Ok(response_rx)
209 }
210
211 fn wrap_in_envelope<M>(&self, message: M) -> (Envelope<A>, oneshot::Receiver<A::Reply>)
212 where
213 A: DeferableReplyHandler<M>,
214 M: 'static + Send + Sync + fmt::Debug,
215 {
216 let guard = self
217 .inner
218 .scheduler_client_opt
219 .as_ref()
220 .map(|scheduler_client| scheduler_client.no_advance_time_guard());
221 wrap_in_envelope(message, guard)
222 }
223
224 pub async fn send_message_with_backpressure_counter<M>(
230 &self,
231 message: M,
232 backpressure_micros_counter_opt: Option<&IntCounter>,
233 ) -> Result<oneshot::Receiver<A::Reply>, SendError>
234 where
235 A: DeferableReplyHandler<M>,
236 M: 'static + Send + Sync + fmt::Debug,
237 {
238 let (envelope, response_rx) = self.wrap_in_envelope(message);
239 match self.inner.tx.try_send_low_priority(envelope) {
240 Ok(()) => Ok(response_rx),
241 Err(TrySendError::Full(envelope)) => {
242 if let Some(backpressure_micros_counter) = backpressure_micros_counter_opt {
243 let now = Instant::now();
244 self.inner.tx.send_low_priority(envelope).await?;
245 let elapsed = now.elapsed();
246 backpressure_micros_counter.inc_by(elapsed.as_micros() as u64);
247 } else {
248 self.inner.tx.send_low_priority(envelope).await?;
249 }
250 Ok(response_rx)
251 }
252 Err(TrySendError::Disconnected) => Err(SendError::Disconnected),
253 }
254 }
255
256 pub(crate) fn send_message_with_high_priority<M>(
257 &self,
258 message: M,
259 ) -> Result<oneshot::Receiver<A::Reply>, SendError>
260 where
261 A: DeferableReplyHandler<M>,
262 M: 'static + Send + Sync + fmt::Debug,
263 {
264 let (envelope, response_rx) = self.wrap_in_envelope(message);
265 self.inner.tx.send_high_priority(envelope)?;
266 Ok(response_rx)
267 }
268
269 pub(crate) async fn send_message_with_priority<M>(
270 &self,
271 message: M,
272 priority: Priority,
273 ) -> Result<oneshot::Receiver<A::Reply>, SendError>
274 where
275 A: DeferableReplyHandler<M>,
276 M: 'static + Send + Sync + fmt::Debug,
277 {
278 let (envelope, response_rx) = self.wrap_in_envelope(message);
279 match priority {
280 Priority::High => self.inner.tx.send_high_priority(envelope)?,
281 Priority::Low => {
282 self.inner.tx.send_low_priority(envelope).await?;
283 }
284 }
285 Ok(response_rx)
286 }
287
288 pub async fn ask<M, T>(&self, message: M) -> Result<T, AskError<Infallible>>
293 where
294 A: DeferableReplyHandler<M, Reply = T>,
295 M: 'static + Send + Sync + fmt::Debug,
296 {
297 self.ask_with_backpressure_counter(message, None).await
298 }
299
300 pub async fn ask_with_backpressure_counter<M, T>(
312 &self,
313 message: M,
314 backpressure_micros_counter_opt: Option<&IntCounter>,
315 ) -> Result<T, AskError<Infallible>>
316 where
317 A: DeferableReplyHandler<M, Reply = T>,
318 M: 'static + Send + Sync + fmt::Debug,
319 {
320 let resp = self
321 .send_message_with_backpressure_counter(message, backpressure_micros_counter_opt)
322 .await;
323 resp.map_err(|_send_error| AskError::MessageNotDelivered)?
324 .await
325 .map_err(|_| AskError::ProcessMessageError)
326 }
327
328 pub async fn ask_for_res<M, T, E>(&self, message: M) -> Result<T, AskError<E>>
333 where
334 A: DeferableReplyHandler<M, Reply = Result<T, E>>,
335 M: fmt::Debug + Send + Sync + 'static,
336 E: fmt::Debug,
337 {
338 self.send_message(message)
339 .await
340 .map_err(|_send_error| AskError::MessageNotDelivered)?
341 .await
342 .map_err(|_| AskError::ProcessMessageError)?
343 .map_err(AskError::from)
344 }
345}
346
347pub struct Inbox<A: Actor> {
348 rx: Arc<Receiver<Envelope<A>>>,
349}
350
351impl<A: Actor> Clone for Inbox<A> {
352 fn clone(&self) -> Self {
353 Inbox {
354 rx: self.rx.clone(),
355 }
356 }
357}
358
359impl<A: Actor> Inbox<A> {
360 pub(crate) fn is_empty(&self) -> bool {
361 self.rx.is_empty()
362 }
363
364 pub(crate) async fn recv(&self) -> Result<Envelope<A>, RecvError> {
365 self.rx.recv().await
366 }
367
368 pub(crate) async fn recv_cmd_and_scheduled_msg_only(&self) -> Envelope<A> {
369 self.rx.recv_high_priority().await
370 }
371
372 pub(crate) fn try_recv(&self) -> Result<Envelope<A>, RecvError> {
373 self.rx.try_recv()
374 }
375
376 pub async fn recv_typed_message<M: 'static>(&self) -> Option<M> {
377 while let Ok(mut envelope) = self.rx.recv().await {
378 if let Some(msg) = envelope.message_typed() {
379 return Some(msg);
380 }
381 }
382 None
383 }
384
385 #[allow(dead_code)] pub(crate) fn try_recv_cmd_and_scheduled_msg_only(&self) -> Result<Envelope<A>, RecvError> {
387 self.rx.try_recv_high_priority_message()
388 }
389
390 pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> {
396 self.rx
397 .drain_low_priority()
398 .into_iter()
399 .map(|mut envelope| envelope.message())
400 .collect()
401 }
402
403 pub fn drain_for_test_typed<M: 'static>(&self) -> Vec<M> {
409 self.rx
410 .drain_low_priority()
411 .into_iter()
412 .flat_map(|mut envelope| envelope.message_typed())
413 .collect()
414 }
415}
416
417pub(crate) fn create_mailbox<A: Actor>(
418 actor_name: String,
419 queue_capacity: QueueCapacity,
420 scheduler_client_opt: Option<SchedulerClient>,
421) -> (Mailbox<A>, Inbox<A>) {
422 let (tx, rx) = crate::channel_with_priority::channel(queue_capacity);
423 let ref_count = Arc::new(AtomicUsize::new(1));
424 let mailbox = Mailbox {
425 inner: Arc::new(Inner {
426 tx,
427 instance_id: crate::quickwit_common::new_coolid(&actor_name),
428 scheduler_client_opt,
429 }),
430 ref_count,
431 };
432 let inbox = Inbox { rx: Arc::new(rx) };
433 (mailbox, inbox)
434}
435
436pub struct WeakMailbox<A: Actor> {
437 inner: Weak<Inner<A>>,
438 ref_count: Weak<AtomicUsize>,
439}
440
441impl<A: Actor> WeakMailbox<A> {
442 pub fn upgrade(&self) -> Option<Mailbox<A>> {
443 let inner = self.inner.upgrade()?;
444 let ref_count = self.ref_count.upgrade()?;
445 Some(Mailbox { inner, ref_count })
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use std::mem;
452 use std::time::Duration;
453
454 use super::*;
455 use crate::tests::{Ping, PingReceiverActor};
456 use crate::Universe;
457
458 #[tokio::test]
459 async fn test_weak_mailbox_downgrade_upgrade() {
460 let universe = Universe::with_accelerated_time();
461 let (mailbox, _inbox) = universe.create_test_mailbox::<PingReceiverActor>();
462 let weak_mailbox = mailbox.downgrade();
463 assert!(weak_mailbox.upgrade().is_some());
464 }
465
466 #[tokio::test]
467 async fn test_weak_mailbox_failing_upgrade() {
468 let universe = Universe::with_accelerated_time();
469 let (mailbox, _inbox) = universe.create_test_mailbox::<PingReceiverActor>();
470 let weak_mailbox = mailbox.downgrade();
471 drop(mailbox);
472 assert!(weak_mailbox.upgrade().is_none());
473 }
474
475 struct BackPressureActor;
476
477 impl Actor for BackPressureActor {
478 type ObservableState = ();
479
480 fn observable_state(&self) -> Self::ObservableState {}
481
482 fn queue_capacity(&self) -> QueueCapacity {
483 QueueCapacity::Bounded(0)
484 }
485
486 fn yield_after_each_message(&self) -> bool {
487 false
488 }
489 }
490
491 use async_trait::async_trait;
492
493 #[async_trait]
494 impl Handler<Duration> for BackPressureActor {
495 type Reply = ();
496
497 async fn handle(
498 &mut self,
499 sleep_duration: Duration,
500 _ctx: &ActorContext<Self>,
501 ) -> Result<(), ActorExitStatus> {
502 if !sleep_duration.is_zero() {
503 tokio::time::sleep(sleep_duration).await;
504 }
505 Ok(())
506 }
507 }
508
509 #[tokio::test]
510 async fn test_mailbox_send_with_backpressure_counter_low_backpressure() {
511 let universe = Universe::with_accelerated_time();
512 let back_pressure_actor = BackPressureActor;
513 let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
514 mailbox
517 .ask_with_backpressure_counter(Duration::default(), None)
518 .await
519 .unwrap();
520 let backpressure_micros_counter =
522 IntCounter::new("test_counter", "help for test_counter").unwrap();
523 let wait_duration = Duration::from_millis(1);
524 let processed = mailbox
525 .send_message_with_backpressure_counter(
526 wait_duration,
527 Some(&backpressure_micros_counter),
528 )
529 .await
530 .unwrap();
531 assert!(backpressure_micros_counter.get() < 500);
532 processed.await.unwrap();
533 assert!(backpressure_micros_counter.get() < 500);
534 universe.assert_quit().await;
535 }
536
537 #[tokio::test]
538 async fn test_mailbox_send_with_backpressure_counter_backpressure() {
539 let universe = Universe::with_accelerated_time();
540 let back_pressure_actor = BackPressureActor;
541 let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
542 mailbox
545 .ask_with_backpressure_counter(Duration::default(), None)
546 .await
547 .unwrap();
548 let backpressure_micros_counter =
549 IntCounter::new("test_counter", "help for test_counter").unwrap();
550 let wait_duration = Duration::from_millis(1);
551 mailbox
552 .send_message_with_backpressure_counter(
553 wait_duration,
554 Some(&backpressure_micros_counter),
555 )
556 .await
557 .unwrap();
558 mailbox
561 .send_message_with_backpressure_counter(
562 Duration::default(),
563 Some(&backpressure_micros_counter),
564 )
565 .await
566 .unwrap();
567 assert!(backpressure_micros_counter.get() > 1_000u64);
568 universe.assert_quit().await;
569 }
570
571 #[tokio::test]
572 async fn test_mailbox_waiting_for_processing_does_not_counter_as_backpressure() {
573 let universe = Universe::with_accelerated_time();
574 let back_pressure_actor = BackPressureActor;
575 let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
576 mailbox
577 .ask_with_backpressure_counter(Duration::default(), None)
578 .await
579 .unwrap();
580 let backpressure_micros_counter =
581 IntCounter::new("test_counter", "help for test_counter").unwrap();
582 let start = Instant::now();
583 mailbox
584 .ask_with_backpressure_counter(Duration::from_millis(1), None)
585 .await
586 .unwrap();
587 let elapsed = start.elapsed();
588 assert!(elapsed.as_micros() > 1000);
589 assert_eq!(backpressure_micros_counter.get(), 0);
590 universe.assert_quit().await;
591 }
592
593 #[tokio::test]
594 async fn test_try_send() {
595 let universe = Universe::with_accelerated_time();
596 let (mailbox, _inbox) = universe
597 .create_mailbox::<PingReceiverActor>("hello".to_string(), QueueCapacity::Bounded(1));
598 assert!(mailbox.try_send_message(Ping).is_ok());
599 assert!(matches!(
600 mailbox.try_send_message(Ping).unwrap_err(),
601 TrySendError::Full(Ping)
602 ));
603 }
604
605 #[tokio::test]
606 async fn test_try_send_disconnect() {
607 let universe = Universe::with_accelerated_time();
608 let (mailbox, inbox) = universe
609 .create_mailbox::<PingReceiverActor>("hello".to_string(), QueueCapacity::Bounded(1));
610 assert!(mailbox.try_send_message(Ping).is_ok());
611 mem::drop(inbox);
612 assert!(matches!(
613 mailbox.try_send_message(Ping).unwrap_err(),
614 TrySendError::Disconnected
615 ));
616 }
617}