use std::{
sync::{mpsc, Arc},
thread,
time::Duration,
};
use crate::stream_engine::autonomous_executor::{
args::{Coordinators, EventQueues},
event_queue::{Event, EventPoll, EventTag, NonBlockingEventQueue},
main_job_lock::MainJobLock,
memory_state_machine::MemoryStateTransition,
performance_metrics::{
MetricsUpdateByTaskExecutionOrPurge, PerformanceMetrics, PerformanceMetricsSummary,
},
pipeline_derivatives::PipelineDerivatives,
worker::worker_handle::WorkerSetupCoordinator,
};
pub trait WorkerThreadLoopState {
type ThreadArg: Send + 'static;
fn new(thread_arg: &Self::ThreadArg) -> Self
where
Self: Sized;
fn is_integral(&self) -> bool;
}
pub trait WorkerThread {
const THREAD_NAME: &'static str;
type ThreadArg: Send + 'static;
type LoopState: WorkerThreadLoopState<ThreadArg = Self::ThreadArg>;
fn event_subscription() -> Vec<EventTag>;
fn main_loop_cycle(
current_state: Self::LoopState,
thread_arg: &Self::ThreadArg,
event_queue: &NonBlockingEventQueue,
) -> Self::LoopState;
fn ev_update_pipeline(
current_state: Self::LoopState,
pipeline_derivatives: Arc<PipelineDerivatives>,
thread_arg: &Self::ThreadArg,
event_queue: Arc<NonBlockingEventQueue>,
) -> Self::LoopState;
fn ev_replace_performance_metrics(
current_state: Self::LoopState,
metrics: Arc<PerformanceMetrics>,
thread_arg: &Self::ThreadArg,
event_queue: Arc<NonBlockingEventQueue>,
) -> Self::LoopState;
fn ev_incremental_update_metrics(
current_state: Self::LoopState,
metrics: Arc<MetricsUpdateByTaskExecutionOrPurge>,
thread_arg: &Self::ThreadArg,
event_queue: Arc<NonBlockingEventQueue>,
) -> Self::LoopState;
fn ev_report_metrics_summary(
current_state: Self::LoopState,
metrics_summary: Arc<PerformanceMetricsSummary>,
thread_arg: &Self::ThreadArg,
event_queue: Arc<NonBlockingEventQueue>,
) -> Self::LoopState;
fn ev_transit_memory_state(
current_state: Self::LoopState,
memory_state_transition: Arc<MemoryStateTransition>,
thread_arg: &Self::ThreadArg,
event_queue: Arc<NonBlockingEventQueue>,
) -> Self::LoopState;
fn run(
main_job_lock: Arc<MainJobLock>,
event_queues: EventQueues,
stop_receiver: mpsc::Receiver<()>,
coordinators: Coordinators,
thread_arg: Self::ThreadArg,
) {
let event_polls = Self::event_subscription()
.into_iter()
.map(|ev| match ev {
EventTag::Blocking(_) => event_queues.blocking.subscribe(ev),
EventTag::NonBlocking(_) => event_queues.non_blocking.subscribe(ev),
})
.collect();
let _ = thread::Builder::new()
.name(Self::THREAD_NAME.into())
.spawn(move || {
Self::main_loop(
main_job_lock,
event_queues.non_blocking,
event_polls,
stop_receiver,
coordinators,
thread_arg,
)
});
}
fn main_loop(
main_job_lock: Arc<MainJobLock>,
event_queue: Arc<NonBlockingEventQueue>,
event_polls: Vec<EventPoll>,
stop_receiver: mpsc::Receiver<()>,
coordinators: Coordinators,
thread_arg: Self::ThreadArg,
) {
log::info!("[{}] main loop started", Self::THREAD_NAME);
Self::setup_ready(coordinators.worker_setup_coordinator.clone());
coordinators
.worker_setup_coordinator
.sync_wait_all_workers();
let mut state = Self::LoopState::new(&thread_arg);
while stop_receiver.try_recv().is_err() {
if let Ok(_lock) = main_job_lock.try_main_job() {
if state.is_integral() {
state = Self::main_loop_cycle(state, &thread_arg, event_queue.as_ref());
} else {
log::debug!("[{}] main_loop(): state is not integral", Self::THREAD_NAME);
thread::sleep(Duration::from_millis(100));
}
}
state = Self::handle_events(state, &event_polls, &thread_arg, event_queue.clone());
}
log::info!(
"[{}] main loop finished. Synchronize other threads to finish...",
Self::THREAD_NAME
);
coordinators.worker_stop_coordinator.sync_wait_all_workers();
}
fn setup_ready(worker_setup_coordinator: Arc<WorkerSetupCoordinator>);
fn handle_events(
current_state: Self::LoopState,
event_polls: &[EventPoll],
thread_arg: &Self::ThreadArg,
nb_event_queue: Arc<NonBlockingEventQueue>, ) -> Self::LoopState {
let mut state = current_state;
for event_poll in event_polls {
#[allow(clippy::single_match)]
while let Some(ev) = event_poll.poll() {
match ev {
Event::UpdatePipeline {
pipeline_derivatives,
} => {
state = Self::ev_update_pipeline(
state,
pipeline_derivatives,
thread_arg,
nb_event_queue.clone(),
);
}
Event::ReplacePerformanceMetrics { metrics } => {
state = Self::ev_replace_performance_metrics(
state,
metrics,
thread_arg,
nb_event_queue.clone(),
);
}
Event::IncrementalUpdateMetrics {
metrics_update_by_task_execution_or_purge,
} => {
state = Self::ev_incremental_update_metrics(
state,
metrics_update_by_task_execution_or_purge,
thread_arg,
nb_event_queue.clone(),
)
}
Event::ReportMetricsSummary { metrics_summary } => {
state = Self::ev_report_metrics_summary(
state,
metrics_summary,
thread_arg,
nb_event_queue.clone(),
)
}
Event::TransitMemoryState {
memory_state_transition,
} => {
state = Self::ev_transit_memory_state(
state,
memory_state_transition,
thread_arg,
nb_event_queue.clone(),
)
}
}
}
}
state
}
}