use std::time::Duration;
use crate::{
coordinator::{Coordinator, CoordinatorExecutionError},
runtime::RuntimeFlavor,
snapshot::PersistenceBackend,
worker::{StreamProvider, WorkerBuilder, WorkerExecutionError},
};
use super::{communication::InterThreadCommunication, Shared};
use bon::Builder;
use thiserror::Error;
#[derive(Builder)]
pub struct SingleThreadRuntime<P, F> {
#[builder(finish_fn)]
build: F,
persistence: P,
snapshots: Option<Duration>,
}
impl<P, F> SingleThreadRuntime<P, F>
where
P: PersistenceBackend + Clone + Send,
F: FnOnce(&mut dyn StreamProvider),
{
pub fn execute(self) -> Result<(), ExecutionError> {
let mut flavor = SingleThreadRuntimeFlavor::default();
let mut worker = WorkerBuilder::new(flavor.clone(), self.persistence.clone());
(self.build)(&mut worker);
let (coordinator, _) = Coordinator::new();
let communication = flavor
.communication()
.expect("SingleThread communication is infallible");
let _coord_thread = std::thread::spawn(move || {
coordinator.execute(1, self.snapshots, self.persistence, communication)
});
worker.execute()?;
Ok(())
}
}
#[derive(Debug, Error)]
pub enum ExecutionError {
#[error("Error executing worker")]
Worker(#[from] WorkerExecutionError),
#[error("Error executing coordinator")]
Coordinator(#[from] CoordinatorExecutionError),
#[error("Error joining coordinator thread: {0:?}")]
CoordinatorJoin(Box<dyn std::any::Any + std::marker::Send>),
}
#[derive(Debug, Default, Clone)]
pub struct SingleThreadRuntimeFlavor {
comm_shared: Shared,
}
impl RuntimeFlavor for SingleThreadRuntimeFlavor {
type Communication = InterThreadCommunication;
fn communication(
&mut self,
) -> Result<Self::Communication, crate::runtime::runtime_flavor::CommunicationError> {
Ok(InterThreadCommunication::new(self.comm_shared.clone(), 0))
}
fn this_worker_id(&self) -> u64 {
0
}
}