Skip to main content

studiole_command/services/
command_mediator.rs

1use crate::prelude::*;
2use tokio::sync::broadcast::{Receiver, Sender, channel};
3
4/// Broadcast channel buffer size for command events.
5///
6/// - Must be large enough that subscribers don't lag behind when many
7///   commands are queued in quick succession
8/// - Too small and receivers get `RecvError::Lagged`, missing events
9/// - Memory cost is negligible as each slot holds a small event enum
10pub(crate) const CHANNEL_CAPACITY: usize = 1024;
11
12/// A mediator between the [`CommandRunner`], [`Worker`] and [`CliProgress`] services.
13pub struct CommandMediator<T: ICommandInfo> {
14    /// Events
15    events: Sender<T::Event>,
16    /// Queue of commands to execute
17    queue: Mutex<VecDeque<T::Request>>,
18    /// Map of requests to their current status.
19    commands: Mutex<HashMap<T::Request, CommandStatus<T>>>,
20    /// Notify workers when new work is available
21    notify_workers: Notify,
22    /// Current status of the runner
23    runner_status: Mutex<RunnerStatus>,
24}
25
26impl<T: ICommandInfo + 'static> Service for CommandMediator<T> {
27    type Error = Infallible;
28
29    async fn from_services(_services: &ServiceProvider) -> Result<Self, Report<Self::Error>> {
30        Ok(Self::new())
31    }
32}
33
34impl<T: ICommandInfo> CommandMediator<T> {
35    pub(super) fn new() -> Self {
36        let (events, _) = channel::<T::Event>(CHANNEL_CAPACITY);
37        Self {
38            events,
39            queue: Mutex::default(),
40            notify_workers: Notify::default(),
41            runner_status: Mutex::default(),
42            commands: Mutex::default(),
43        }
44    }
45
46    async fn get_runner_status(&self) -> RunnerStatus {
47        *self.runner_status.lock().await
48    }
49}
50
51// Implementation for `CommandRunner`
52impl<T: ICommandInfo> CommandMediator<T> {
53    pub(super) async fn set_runner_status(&self, status: RunnerStatus) {
54        trace!(?status, "Set runner status");
55        let mut status_guard = self.runner_status.lock().await;
56        *status_guard = status;
57        drop(status_guard);
58        self.notify_workers.notify_waiters();
59    }
60
61    /// Add a command to the queue.
62    ///
63    /// If the request is already queued or executing then it's ignored and `false` is returned.
64    ///
65    /// If added to the queue then progress is updated and subscribers are notified.
66    pub(super) async fn queue(&self, request: T::Request, command: T::Command) -> bool {
67        trace!(?request, "Queueing");
68        let mut commands = self.commands.lock().await;
69        if let Some(CommandStatus::Queued(_) | CommandStatus::Executing) = commands.get(&request) {
70            trace!(?request, "Skipping as already queued or executing");
71            return false;
72        }
73        commands.insert(request.clone(), CommandStatus::Queued(command));
74        drop(commands);
75        let _ = self
76            .events
77            .send(T::Event::new(EventKind::Queued, request.clone(), None));
78        let mut queue = self.queue.lock().await;
79        queue.push_back(request.clone());
80        drop(queue);
81        trace!(?request, "Queued");
82        trace!(?request, "Notifying worker");
83        self.notify_workers.notify_one();
84        true
85    }
86
87    /// Get the commands.
88    ///
89    /// Note: The [`MutexGuard`] must be dropped or the [`Worker`] will be unable to finish
90    /// execution.
91    pub(super) async fn get_commands(
92        &self,
93    ) -> MutexGuard<'_, HashMap<T::Request, CommandStatus<T>>> {
94        self.commands.lock().await
95    }
96}
97
98// Implementation for `Worker`
99impl<T: ICommandInfo> CommandMediator<T> {
100    /// Get the next instruction.
101    #[allow(clippy::panic)]
102    pub(super) async fn get_instruction(&self) -> Instruction<'_, T> {
103        let notify = self.notify_workers.notified();
104        let mut queue_guard = self.queue.lock().await;
105        if self.get_runner_status().await == RunnerStatus::Stopping {
106            return Instruction::Stop;
107        }
108        if let Some(request) = queue_guard.pop_front() {
109            drop(queue_guard);
110            let _ = self
111                .events
112                .send(T::Event::new(EventKind::Executing, request.clone(), None));
113            let mut commands = self.commands.lock().await;
114            let option = commands.insert(request.clone(), CommandStatus::Executing);
115            drop(commands);
116            let Some(CommandStatus::Queued(command)) = option else {
117                panic!("command should be queued but was {option:?}");
118            };
119            return Instruction::Execute(request, command);
120        }
121        drop(queue_guard);
122        if self.get_runner_status().await == RunnerStatus::Draining {
123            return Instruction::Stop;
124        }
125        Instruction::Wait(notify)
126    }
127
128    /// Add the result of a completed execution.
129    pub(super) async fn completed(
130        &self,
131        request: T::Request,
132        result: Result<T::Success, T::Failure>,
133    ) {
134        let mut commands = self.commands.lock().await;
135        match result {
136            Ok(success) => {
137                trace!(?request, "Command succeeded");
138                commands.insert(request.clone(), CommandStatus::Succeeded(success.clone()));
139                let _ =
140                    self.events
141                        .send(T::Event::new(EventKind::Succeeded, request, Some(success)));
142            }
143            Err(failure) => {
144                warn!(?request, error = ?failure, "Command failed");
145                commands.insert(request.clone(), CommandStatus::Failed(failure));
146                let _ = self
147                    .events
148                    .send(T::Event::new(EventKind::Failed, request, None));
149            }
150        }
151        drop(commands);
152    }
153}
154
155// Implementation for event subscribers
156impl<T: ICommandInfo> CommandMediator<T> {
157    /// Subscribe to events.
158    pub fn subscribe(&self) -> Receiver<T::Event> {
159        self.events.subscribe()
160    }
161}