unros_core/service/
mod.rs

1//! Services can be thought of as a long running function whose execution has to be requested through
2//! an API, and its status can be tracked while it is running, and the return value will be provided
3//! back to the service requester.
4
5use tokio::sync::{mpsc, oneshot};
6
7/// The API for a `Service`. This is essentially the public facing end of a `Service`.
8/// 
9/// There are 4 generic types that must be specified:
10/// 
11/// 1. `ScheduleInput` - The input data that the `Service` requires to schedule a task.
12/// 2. `ScheduleError` - The type of the error that may occur during scheduling.
13/// 3. `ScheduleOutput` - The data that the `Service` returned when scheduling is successful.
14/// 4. `TaskOutput` - The data that is returned when the scheduled task is done.
15pub struct ServiceHandle<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput> {
16    schedule_sender: mpsc::UnboundedSender<
17        ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>,
18    >,
19}
20
21impl<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput> Clone
22    for ServiceHandle<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
23{
24    fn clone(&self) -> Self {
25        Self {
26            schedule_sender: self.schedule_sender.clone(),
27        }
28    }
29}
30
31impl<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
32    ServiceHandle<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
33{
34    /// Try to schedule a task, or return `None` if the `Service` was dropped.
35    pub async fn try_schedule_or_closed(
36        &self,
37        input: ScheduleInput,
38    ) -> Option<Result<ScheduledService<ScheduleOutput, TaskOutput>, ScheduleError>> {
39        let (sender, receiver) = oneshot::channel();
40        self.schedule_sender
41            .send(ScheduleRequest {
42                input: Some(input),
43                sender,
44            })
45            .ok()?;
46        receiver.await.ok()
47    }
48
49    /// Try to schedule a task, awaiting forever if the `Service` was dropped.
50    pub async fn try_schedule(
51        &self,
52        input: ScheduleInput,
53    ) -> Result<ScheduledService<ScheduleOutput, TaskOutput>, ScheduleError> {
54        let Some(out) = self.try_schedule_or_closed(input).await else {
55            std::future::pending::<()>().await;
56            unreachable!()
57        };
58        out
59    }
60}
61
62/// Represents a task that was successfully scheduled.
63/// 
64/// This is used by service requesters.
65pub struct ScheduledService<ScheduleOutput, TaskOutput> {
66    data: Option<ScheduleOutput>,
67    output_recv: oneshot::Receiver<TaskOutput>,
68}
69
70/// Represents a request to schedule a task.
71/// 
72/// This is used by service providers.
73pub struct ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput> {
74    input: Option<ScheduleInput>,
75    sender: oneshot::Sender<Result<ScheduledService<ScheduleOutput, TaskOutput>, ScheduleError>>,
76}
77
78/// A scheduled task that is awaiting output.
79/// 
80/// This is used by service providers.
81pub struct Pending<TaskOutput> {
82    output_sender: oneshot::Sender<TaskOutput>,
83}
84
85/// The private end of a `Service`.
86/// 
87/// Users will access this `Service` through its `ServiceHandle`. As such, no references
88/// to this should be shared.
89/// 
90/// There are 4 generic types that must be specified:
91/// 
92/// 1. `ScheduleInput` - The input data that the `Service` requires to schedule a task.
93/// 2. `ScheduleError` - The type of the error that may occur during scheduling.
94/// 3. `ScheduleOutput` - The data that the `Service` returned when scheduling is successful.
95/// 4. `TaskOutput` - The data that is returned when the scheduled task is done.
96pub struct Service<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput> {
97    schedule_recv: mpsc::UnboundedReceiver<
98        ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>,
99    >,
100}
101
102/// Creates a new `Service` and `ServiceHandle` that are tied together.
103/// 
104/// There are 4 generic types that must be specified:
105/// 
106/// 1. `ScheduleInput` - The input data that the `Service` requires to schedule a task.
107/// 2. `ScheduleError` - The type of the error that may occur during scheduling.
108/// 3. `ScheduleOutput` - The data that the `Service` returned when scheduling is successful.
109/// 4. `TaskOutput` - The data that is returned when the scheduled task is done.
110#[must_use]
111pub fn new_service<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>() -> (
112    Service<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>,
113    ServiceHandle<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>,
114) {
115    let (schedule_sender, schedule_recv) = mpsc::unbounded_channel();
116    (Service { schedule_recv }, ServiceHandle { schedule_sender })
117}
118
119impl<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
120    Service<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
121{
122    /// Waits for a new scheduling request.
123    pub async fn wait_for_request(
124        &mut self,
125    ) -> Option<ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>> {
126        self.schedule_recv.recv().await
127    }
128
129    /// Waits for a new scheduling request in a blocking way.
130    pub fn blocking_wait_for_request(
131        &mut self,
132    ) -> Option<ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>> {
133        self.schedule_recv.blocking_recv()
134    }
135}
136
137impl<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
138    ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
139{
140    /// Accepts this `ScheduleRequest`, indicating to the requester that the task
141    /// will now start processing.
142    /// 
143    /// The given data will be sent to the requester.
144    pub fn accept(self, data: ScheduleOutput) -> Option<Pending<TaskOutput>> {
145        let (output_sender, output_recv) = oneshot::channel();
146        if self
147            .sender
148            .send(Ok(ScheduledService {
149                data: Some(data),
150                output_recv,
151            }))
152            .is_err()
153        {
154            None
155        } else {
156            Some(Pending { output_sender })
157        }
158    }
159
160    /// Rejects the `ScheduleRequest` with the given error that the requester will receive.
161    pub fn reject(self, error: ScheduleError) {
162        let _ = self.sender.send(Err(error));
163    }
164
165    /// Get a reference to the `ScheduleInput`, assuming it has not been taken yet.
166    pub fn get_input(&self) -> Option<&ScheduleInput> {
167        self.input.as_ref()
168    }
169
170    /// Get a mutable reference to the `ScheduleInput`, assuming it has not been taken yet.
171    pub fn get_mut_input(&mut self) -> Option<&mut ScheduleInput> {
172        self.input.as_mut()
173    }
174
175    /// Takes the `ScheduleInput`.
176    /// 
177    /// Subsequent calls to this and the various `get_*` methods will return `None`.
178    pub fn take_input(&mut self) -> Option<ScheduleInput> {
179        self.input.take()
180    }
181}
182
183impl<TaskOutput> Pending<TaskOutput> {
184    /// Finishes this task by providing its `TaskOutput`.
185    /// 
186    /// It is not guaranteed that the requester will receive this, since
187    /// the requester may drop their handle before this method is called.
188    pub fn finish(self, output: TaskOutput) {
189        let _ = self.output_sender.send(output);
190    }
191
192    /// Checks if the requester has dropped their handle to the task.
193    /// 
194    /// Do not rely on this to guarantee that the requester will receive
195    /// the `TaskOutput` due to TOCTOU issues.
196    #[must_use]
197    pub fn is_closed(&self) -> bool {
198        self.output_sender.is_closed()
199    }
200}
201
202impl<ScheduleOutput, TaskOutput> ScheduledService<ScheduleOutput, TaskOutput> {
203    /// Get a reference to the `ScheduleOutput`, assuming it has not been taken yet.
204    pub fn get_data(&self) -> Option<&ScheduleOutput> {
205        self.data.as_ref()
206    }
207
208    /// Get a mutable reference to the `ScheduleOutput`, assuming it has not been taken yet.
209    pub fn get_mut_data(&mut self) -> Option<&mut ScheduleOutput> {
210        self.data.as_mut()
211    }
212
213    /// Takes the `ScheduleOutput`.
214    /// 
215    /// Subsequent calls to this and the various `get_*` methods will return `None`.
216    pub fn take_data(&mut self) -> Option<ScheduleOutput> {
217        self.data.take()
218    }
219
220    /// Waits for the `TaskOutput`, or returns `None` if the `Service` was dropped.
221    pub async fn wait_or_closed(self) -> Option<TaskOutput> {
222        self.output_recv.await.ok()
223    }
224
225    /// Waits for the `TaskOutput`, awaiting forever if the `Service` was dropped.
226    pub async fn wait(self) -> TaskOutput {
227        let Some(out) = self.wait_or_closed().await else {
228            std::future::pending::<()>().await;
229            unreachable!()
230        };
231        out
232    }
233}