studiole_command/services/
command_mediator.rs1use crate::prelude::*;
2use tokio::sync::broadcast::{Receiver, Sender, channel};
3
4pub(crate) const CHANNEL_CAPACITY: usize = 1024;
11
12pub struct CommandMediator<T: ICommandInfo> {
14 events: Sender<T::Event>,
16 queue: Mutex<VecDeque<T::Request>>,
18 commands: Mutex<HashMap<T::Request, CommandStatus<T>>>,
20 notify_workers: Notify,
22 runner_status: Mutex<RunnerStatus>,
24}
25
26impl<T: ICommandInfo + 'static> Service for CommandMediator<T> {
27 type Error = Infallible;
28
29 async fn from_services(_services: &ServiceProvider) -> Result<Self, Report<Self::Error>> {
30 Ok(Self::new())
31 }
32}
33
34impl<T: ICommandInfo> CommandMediator<T> {
35 pub(super) fn new() -> Self {
36 let (events, _) = channel::<T::Event>(CHANNEL_CAPACITY);
37 Self {
38 events,
39 queue: Mutex::default(),
40 notify_workers: Notify::default(),
41 runner_status: Mutex::default(),
42 commands: Mutex::default(),
43 }
44 }
45
46 async fn get_runner_status(&self) -> RunnerStatus {
47 *self.runner_status.lock().await
48 }
49}
50
51impl<T: ICommandInfo> CommandMediator<T> {
53 pub(super) async fn set_runner_status(&self, status: RunnerStatus) {
54 trace!(?status, "Set runner status");
55 let mut status_guard = self.runner_status.lock().await;
56 *status_guard = status;
57 drop(status_guard);
58 self.notify_workers.notify_waiters();
59 }
60
61 pub(super) async fn queue(&self, request: T::Request, command: T::Command) -> bool {
67 trace!(?request, "Queueing");
68 let mut commands = self.commands.lock().await;
69 if let Some(CommandStatus::Queued(_) | CommandStatus::Executing) = commands.get(&request) {
70 trace!(?request, "Skipping as already queued or executing");
71 return false;
72 }
73 commands.insert(request.clone(), CommandStatus::Queued(command));
74 drop(commands);
75 let _ = self
76 .events
77 .send(T::Event::new(EventKind::Queued, request.clone(), None));
78 let mut queue = self.queue.lock().await;
79 queue.push_back(request.clone());
80 drop(queue);
81 trace!(?request, "Queued");
82 trace!(?request, "Notifying worker");
83 self.notify_workers.notify_one();
84 true
85 }
86
87 pub(super) async fn get_commands(
92 &self,
93 ) -> MutexGuard<'_, HashMap<T::Request, CommandStatus<T>>> {
94 self.commands.lock().await
95 }
96}
97
98impl<T: ICommandInfo> CommandMediator<T> {
100 #[allow(clippy::panic)]
102 pub(super) async fn get_instruction(&self) -> Instruction<'_, T> {
103 let notify = self.notify_workers.notified();
104 let mut queue_guard = self.queue.lock().await;
105 if self.get_runner_status().await == RunnerStatus::Stopping {
106 return Instruction::Stop;
107 }
108 if let Some(request) = queue_guard.pop_front() {
109 drop(queue_guard);
110 let _ = self
111 .events
112 .send(T::Event::new(EventKind::Executing, request.clone(), None));
113 let mut commands = self.commands.lock().await;
114 let option = commands.insert(request.clone(), CommandStatus::Executing);
115 drop(commands);
116 let Some(CommandStatus::Queued(command)) = option else {
117 panic!("command should be queued but was {option:?}");
118 };
119 return Instruction::Execute(request, command);
120 }
121 drop(queue_guard);
122 if self.get_runner_status().await == RunnerStatus::Draining {
123 return Instruction::Stop;
124 }
125 Instruction::Wait(notify)
126 }
127
128 pub(super) async fn completed(
130 &self,
131 request: T::Request,
132 result: Result<T::Success, T::Failure>,
133 ) {
134 let mut commands = self.commands.lock().await;
135 match result {
136 Ok(success) => {
137 trace!(?request, "Command succeeded");
138 commands.insert(request.clone(), CommandStatus::Succeeded(success.clone()));
139 let _ =
140 self.events
141 .send(T::Event::new(EventKind::Succeeded, request, Some(success)));
142 }
143 Err(failure) => {
144 warn!(?request, error = ?failure, "Command failed");
145 commands.insert(request.clone(), CommandStatus::Failed(failure));
146 let _ = self
147 .events
148 .send(T::Event::new(EventKind::Failed, request, None));
149 }
150 }
151 drop(commands);
152 }
153}
154
155impl<T: ICommandInfo> CommandMediator<T> {
157 pub fn subscribe(&self) -> Receiver<T::Event> {
159 self.events.subscribe()
160 }
161}