monolake_core/orchestrator/mod.rs
1//! Worker and service lifecycle management for thread-per-core network services.
2//!
3//! This module provides the core functionality for managing workers and services
4//! in a thread-per-core architecture. It implements a flexible and efficient system
5//! for deploying, updating, and managing services across multiple worker threads.
6//!
7//! # Key Components
8//!
9//! - [`WorkerManager`]: Manages the entire fleet of worker threads.
10//! - [`ServiceExecutor`]: Handles service lifecycle within a single worker thread.
11//! - [`ServiceDeploymentContainer`]: Manages the deployment and updates of individual services.
12//! - [`ServiceCommand`]: Represents actions to be performed on services.
13//! - [`ResultGroup`]: Aggregates results from operations across multiple workers.
14//!
15//! # Deployment Models
16//!
17//! This module supports two deployment models:
18//!
19//! 1. Two-Stage Deployment: For updating services with state preservation.
20//! - Precommit a service using [`Precommit`](ServiceCommand::Precommit).
21//! - Update or commit using [`Update`](ServiceCommand::Update) or
22//! [`Commit`](ServiceCommand::Commit).
23//!
24//! 2. Single-Stage Deployment: For initial deployments or stateless updates.
25//! - Deploy in one step using [`PrepareAndCommit`](ServiceCommand::PrepareAndCommit).
26//!
27//! # Service Lifecycle
28//!
29//! Services can be dynamically updated while the system is running:
30//! - Existing connections continue using the current service version.
31//! - New connections use the latest deployed version.
32//!
33//! This module is designed to work seamlessly with the `service_async` crate,
34//! leveraging its [`Service`] and [`AsyncMakeService`](service_async::AsyncMakeService)
35//! traits for efficient service creation and management.
36use std::fmt::Debug;
37
38use futures_channel::oneshot::Sender as OSender;
39use monoio::io::stream::Stream;
40use service_async::Service;
41use tracing::{debug, error, info, warn};
42
43use self::runtime::RuntimeWrapper;
44
45mod runtime;
46mod service_executor;
47mod worker_manager;
48
49pub use service_executor::{
50 Execute, ServiceCommand, ServiceCommandTask, ServiceDeploymentContainer, ServiceExecutor,
51 ServiceSlot,
52};
53pub use worker_manager::{JoinHandlesWithOutput, WorkerManager};
54
55/// A collection of results from multiple worker operations.
56///
57/// [`ResultGroup`] is typically used to aggregate the results of dispatching
58/// a [`ServiceCommand`] to multiple workers in a [`WorkerManager`].
59/// It provides a convenient way to handle and process multiple results as a single unit.
60pub struct ResultGroup<T, E>(Vec<Result<T, E>>);
61
62impl<T, E> From<Vec<Result<T, E>>> for ResultGroup<T, E> {
63 fn from(value: Vec<Result<T, E>>) -> Self {
64 Self(value)
65 }
66}
67
68impl<T, E> From<ResultGroup<T, E>> for Vec<Result<T, E>> {
69 fn from(value: ResultGroup<T, E>) -> Self {
70 value.0
71 }
72}
73
74impl<E> ResultGroup<(), E> {
75 pub fn err(self) -> Result<(), E> {
76 for r in self.0.into_iter() {
77 r?;
78 }
79 Ok(())
80 }
81}
82
83/// Serves incoming connections using the provided listener and service.
84///
85/// This function runs a loop that continuously accepts new connections and handles them
86/// using the provided service. It can be gracefully stopped using the provided `stop` channel.
87///
88/// # Behavior
89///
90/// The function will run until one of the following occurs:
91/// - The `stop` channel is triggered, indicating a graceful shutdown.
92/// - The listener closes, indicating no more incoming connections.
93///
94/// For each accepted connection, a new task is spawned to handle it using the provided service.
95pub async fn serve<S, Svc, A, E>(mut listener: S, handler: ServiceSlot<Svc>, mut stop: OSender<()>)
96where
97 S: Stream<Item = Result<A, E>> + 'static,
98 E: Debug,
99 Svc: Service<A> + 'static,
100 Svc::Error: Debug,
101 A: 'static,
102{
103 let mut cancellation = stop.cancellation();
104 loop {
105 monoio::select! {
106 _ = &mut cancellation => {
107 info!("server is notified to stop");
108 break;
109 }
110 accept_opt = listener.next() => {
111 let accept = match accept_opt {
112 Some(accept) => accept,
113 None => {
114 info!("listener is closed, serve stopped");
115 return;
116 }
117 };
118 match accept {
119 Ok(accept) => {
120 let svc = handler.get_svc();
121 monoio::spawn(async move {
122 match svc.call(accept).await {
123 Ok(_) => {
124 debug!("Connection complete");
125 }
126 Err(e) => {
127 error!("Connection error: {e:?}");
128 }
129 }
130 });
131 }
132 Err(e) => warn!("Accept connection failed: {e:?}"),
133 }
134 }
135 }
136 }
137}