witty_actors/
lib.rs

1// Copyright (C) 2023 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20#![deny(clippy::disallowed_methods)]
21
22//! quickwit-actors is a simplified actor framework for quickwit.
23//!
24//! It solves the following problem:
25//! - have sync and async tasks communicate together.
26//! - make these task observable
27//! - make these task modular and testable
28//! - detect when some task is stuck and does not progress anymore
29
30use std::fmt;
31
32mod quickwit_common;
33
34use crate::quickwit_common::proto::{ServiceError, ServiceErrorCode};
35use tokio::time::Duration;
36mod actor;
37mod actor_context;
38mod actor_handle;
39mod actor_state;
40#[doc(hidden)]
41pub mod channel_with_priority;
42mod command;
43mod envelope;
44mod mailbox;
45mod observation;
46mod registry;
47pub(crate) mod scheduler;
48mod spawn_builder;
49mod supervisor;
50
51pub use scheduler::{start_scheduler, SchedulerClient};
52
53#[cfg(test)]
54pub(crate) mod tests;
55mod universe;
56
57use crate::quickwit_common::KillSwitch;
58pub use actor::{Actor, ActorExitStatus, DeferableReplyHandler, Handler};
59pub use actor_handle::{ActorHandle, Health, Healthz, Supervisable};
60pub use command::Command;
61pub use observation::{Observation, ObservationType};
62pub use spawn_builder::SpawnContext;
63use thiserror::Error;
64pub use universe::Universe;
65
66pub use self::actor_context::ActorContext;
67pub use self::actor_state::ActorState;
68pub use self::channel_with_priority::{QueueCapacity, RecvError, SendError, TrySendError};
69pub use self::mailbox::{Inbox, Mailbox};
70pub use self::registry::ActorObservation;
71pub use self::supervisor::{Supervisor, SupervisorState};
72
73/// Heartbeat used to verify that actors are progressing.
74///
75/// If an actor does not advertise a progress within an interval of duration `HEARTBEAT`,
76/// its supervisor will consider it as blocked and will proceed to kill it, as well
77/// as all of the actors all the actors that share the killswitch.
78pub const HEARTBEAT: Duration = if cfg!(any(test, feature = "testsuite")) {
79    // Right now some unit test end when we detect that a
80    // pipeline has terminated, which can require waiting
81    // for a heartbeat.
82    //
83    // We use a shorter heartbeat to reduce the time running unit tests.
84    Duration::from_millis(500)
85} else {
86    Duration::from_secs(3)
87};
88
89/// Time we accept to wait for a new observation.
90///
91/// Once this time is elapsed, we just return the last observation.
92const OBSERVE_TIMEOUT: Duration = Duration::from_secs(3);
93
94/// Error that occurred while calling `ActorContext::ask(..)` or `Universe::ask`
95#[derive(Error, Debug)]
96pub enum AskError<E: fmt::Debug> {
97    #[error("Message could not be delivered")]
98    MessageNotDelivered,
99    #[error("Error while the message was being processed.")]
100    ProcessMessageError,
101    #[error("The handler returned an error: `{0:?}`.")]
102    ErrorReply(#[from] E),
103}
104
105impl<E: fmt::Debug + ServiceError> ServiceError for AskError<E> {
106    fn status_code(&self) -> ServiceErrorCode {
107        match self {
108            AskError::MessageNotDelivered => ServiceErrorCode::Internal,
109            AskError::ProcessMessageError => ServiceErrorCode::Internal,
110            AskError::ErrorReply(err) => err.status_code(),
111        }
112    }
113}