Skip to main content

studiole_command/services/
worker.rs

1use crate::prelude::*;
2use std::thread::current;
3use tokio::runtime::Handle;
4use tokio::sync::futures::Notified;
5
6/// Unique identifier for a [`Worker`].
7pub type WorkerId = usize;
8
9/// An instruction sent to a [`Worker`].
10pub enum Instruction<'a, T: ICommandInfo> {
11    Wait(Notified<'a>),
12    Stop,
13    Execute(T::Request, T::Command),
14}
15
16/// A worker that executes commands
17///
18/// The worker is instructed by a [`CommandMediator`].
19pub struct Worker {
20    id: WorkerId,
21    handle: JoinHandle<()>,
22}
23
24impl Worker {
25    pub(super) fn new<T: ICommandInfo + 'static>(
26        id: WorkerId,
27        mediator: Arc<CommandMediator<T>>,
28    ) -> Self {
29        let thread = current().id();
30        trace!(worker = id, ?thread, "Spawning");
31        let handle = Handle::current().spawn(async move {
32            internal_loop(mediator, id).await;
33        });
34        trace!(worker = id, "Spawned");
35        Self { id, handle }
36    }
37
38    /// Wait for the worker to stop
39    pub(super) async fn wait_for_stop(self) {
40        let _ = self.handle.await;
41    }
42}
43
44async fn internal_loop<T: ICommandInfo>(mediator: Arc<CommandMediator<T>>, worker: WorkerId) {
45    let thread = current().id();
46    trace!(worker, ?thread, "Starting");
47    loop {
48        match mediator.get_instruction().await {
49            Instruction::Execute(request, command) => {
50                let command_id = command.to_string();
51                debug!(worker, ?thread, %command, "Executing");
52                let result = command.execute().await;
53                mediator.completed(request, result).await;
54                trace!(worker, command = command_id, "Executed");
55            }
56            Instruction::Wait(notified) => {
57                trace!(worker, "Waiting");
58                notified.await;
59            }
60            Instruction::Stop => {
61                trace!(worker, "Stopping");
62                break;
63            }
64        }
65    }
66    trace!(worker, ?thread, "Stopped");
67}
68
69impl Display for Worker {
70    fn fmt(&self, formatter: &mut Formatter<'_>) -> FmtResult {
71        write!(formatter, "Worker {:02}", self.id)
72    }
73}