monolake_core/orchestrator/
mod.rs

1//! Worker and service lifecycle management for thread-per-core network services.
2//!
3//! This module provides the core functionality for managing workers and services
4//! in a thread-per-core architecture. It implements a flexible and efficient system
5//! for deploying, updating, and managing services across multiple worker threads.
6//!
7//! # Key Components
8//!
9//! - [`WorkerManager`]: Manages the entire fleet of worker threads.
10//! - [`ServiceExecutor`]: Handles service lifecycle within a single worker thread.
11//! - [`ServiceDeploymentContainer`]: Manages the deployment and updates of individual services.
12//! - [`ServiceCommand`]: Represents actions to be performed on services.
13//! - [`ResultGroup`]: Aggregates results from operations across multiple workers.
14//!
15//! # Deployment Models
16//!
17//! This module supports two deployment models:
18//!
19//! 1. Two-Stage Deployment: For updating services with state preservation.
20//!    - Precommit a service using [`Precommit`](ServiceCommand::Precommit).
21//!    - Update or commit using [`Update`](ServiceCommand::Update) or
22//!      [`Commit`](ServiceCommand::Commit).
23//!
24//! 2. Single-Stage Deployment: For initial deployments or stateless updates.
25//!    - Deploy in one step using [`PrepareAndCommit`](ServiceCommand::PrepareAndCommit).
26//!
27//! # Service Lifecycle
28//!
29//! Services can be dynamically updated while the system is running:
30//! - Existing connections continue using the current service version.
31//! - New connections use the latest deployed version.
32//!
33//! This module is designed to work seamlessly with the `service_async` crate,
34//! leveraging its [`Service`] and [`AsyncMakeService`](service_async::AsyncMakeService)
35//! traits for efficient service creation and management.
36use std::fmt::Debug;
37
38use futures_channel::oneshot::Sender as OSender;
39use monoio::io::stream::Stream;
40use service_async::Service;
41use tracing::{debug, error, info, warn};
42
43use self::runtime::RuntimeWrapper;
44
45mod runtime;
46mod service_executor;
47mod worker_manager;
48
49pub use service_executor::{
50    Execute, ServiceCommand, ServiceCommandTask, ServiceDeploymentContainer, ServiceExecutor,
51    ServiceSlot,
52};
53pub use worker_manager::{JoinHandlesWithOutput, WorkerManager};
54
55/// A collection of results from multiple worker operations.
56///
57/// [`ResultGroup`] is typically used to aggregate the results of dispatching
58/// a [`ServiceCommand`] to multiple workers in a [`WorkerManager`].
59/// It provides a convenient way to handle and process multiple results as a single unit.
60pub struct ResultGroup<T, E>(Vec<Result<T, E>>);
61
62impl<T, E> From<Vec<Result<T, E>>> for ResultGroup<T, E> {
63    fn from(value: Vec<Result<T, E>>) -> Self {
64        Self(value)
65    }
66}
67
68impl<T, E> From<ResultGroup<T, E>> for Vec<Result<T, E>> {
69    fn from(value: ResultGroup<T, E>) -> Self {
70        value.0
71    }
72}
73
74impl<E> ResultGroup<(), E> {
75    pub fn err(self) -> Result<(), E> {
76        for r in self.0.into_iter() {
77            r?;
78        }
79        Ok(())
80    }
81}
82
83/// Serves incoming connections using the provided listener and service.
84///
85/// This function runs a loop that continuously accepts new connections and handles them
86/// using the provided service. It can be gracefully stopped using the provided `stop` channel.
87///
88/// # Behavior
89///
90/// The function will run until one of the following occurs:
91/// - The `stop` channel is triggered, indicating a graceful shutdown.
92/// - The listener closes, indicating no more incoming connections.
93///
94/// For each accepted connection, a new task is spawned to handle it using the provided service.
95pub async fn serve<S, Svc, A, E>(mut listener: S, handler: ServiceSlot<Svc>, mut stop: OSender<()>)
96where
97    S: Stream<Item = Result<A, E>> + 'static,
98    E: Debug,
99    Svc: Service<A> + 'static,
100    Svc::Error: Debug,
101    A: 'static,
102{
103    let mut cancellation = stop.cancellation();
104    loop {
105        monoio::select! {
106            _ = &mut cancellation => {
107                info!("server is notified to stop");
108                break;
109            }
110            accept_opt = listener.next() => {
111                let accept = match accept_opt {
112                    Some(accept) => accept,
113                    None => {
114                        info!("listener is closed, serve stopped");
115                        return;
116                    }
117                };
118                match accept {
119                    Ok(accept) => {
120                        let svc = handler.get_svc();
121                        monoio::spawn(async move {
122                            match svc.call(accept).await {
123                                Ok(_) => {
124                                    debug!("Connection complete");
125                                }
126                                Err(e) => {
127                                    error!("Connection error: {e:?}");
128                                }
129                            }
130                        });
131                    }
132                    Err(e) => warn!("Accept connection failed: {e:?}"),
133                }
134            }
135        }
136    }
137}