studiole_command/services/
worker.rs1use crate::prelude::*;
2use std::thread::current;
3use tokio::runtime::Handle;
4use tokio::sync::futures::Notified;
5
6pub type WorkerId = usize;
8
9pub enum Instruction<'a, T: ICommandInfo> {
11 Wait(Notified<'a>),
12 Stop,
13 Execute(T::Request, T::Command),
14}
15
16pub struct Worker {
20 id: WorkerId,
21 handle: JoinHandle<()>,
22}
23
24impl Worker {
25 pub(super) fn new<T: ICommandInfo + 'static>(
26 id: WorkerId,
27 mediator: Arc<CommandMediator<T>>,
28 ) -> Self {
29 let thread = current().id();
30 trace!(worker = id, ?thread, "Spawning");
31 let handle = Handle::current().spawn(async move {
32 internal_loop(mediator, id).await;
33 });
34 trace!(worker = id, "Spawned");
35 Self { id, handle }
36 }
37
38 pub(super) async fn wait_for_stop(self) {
40 let _ = self.handle.await;
41 }
42}
43
44async fn internal_loop<T: ICommandInfo>(mediator: Arc<CommandMediator<T>>, worker: WorkerId) {
45 let thread = current().id();
46 trace!(worker, ?thread, "Starting");
47 loop {
48 match mediator.get_instruction().await {
49 Instruction::Execute(request, command) => {
50 let command_id = command.to_string();
51 debug!(worker, ?thread, %command, "Executing");
52 let result = command.execute().await;
53 mediator.completed(request, result).await;
54 trace!(worker, command = command_id, "Executed");
55 }
56 Instruction::Wait(notified) => {
57 trace!(worker, "Waiting");
58 notified.await;
59 }
60 Instruction::Stop => {
61 trace!(worker, "Stopping");
62 break;
63 }
64 }
65 }
66 trace!(worker, ?thread, "Stopped");
67}
68
69impl Display for Worker {
70 fn fmt(&self, formatter: &mut Formatter<'_>) -> FmtResult {
71 write!(formatter, "Worker {:02}", self.id)
72 }
73}