1use tokio::sync::{mpsc, oneshot};
6
7pub struct ServiceHandle<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput> {
16 schedule_sender: mpsc::UnboundedSender<
17 ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>,
18 >,
19}
20
21impl<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput> Clone
22 for ServiceHandle<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
23{
24 fn clone(&self) -> Self {
25 Self {
26 schedule_sender: self.schedule_sender.clone(),
27 }
28 }
29}
30
31impl<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
32 ServiceHandle<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
33{
34 pub async fn try_schedule_or_closed(
36 &self,
37 input: ScheduleInput,
38 ) -> Option<Result<ScheduledService<ScheduleOutput, TaskOutput>, ScheduleError>> {
39 let (sender, receiver) = oneshot::channel();
40 self.schedule_sender
41 .send(ScheduleRequest {
42 input: Some(input),
43 sender,
44 })
45 .ok()?;
46 receiver.await.ok()
47 }
48
49 pub async fn try_schedule(
51 &self,
52 input: ScheduleInput,
53 ) -> Result<ScheduledService<ScheduleOutput, TaskOutput>, ScheduleError> {
54 let Some(out) = self.try_schedule_or_closed(input).await else {
55 std::future::pending::<()>().await;
56 unreachable!()
57 };
58 out
59 }
60}
61
62pub struct ScheduledService<ScheduleOutput, TaskOutput> {
66 data: Option<ScheduleOutput>,
67 output_recv: oneshot::Receiver<TaskOutput>,
68}
69
70pub struct ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput> {
74 input: Option<ScheduleInput>,
75 sender: oneshot::Sender<Result<ScheduledService<ScheduleOutput, TaskOutput>, ScheduleError>>,
76}
77
78pub struct Pending<TaskOutput> {
82 output_sender: oneshot::Sender<TaskOutput>,
83}
84
85pub struct Service<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput> {
97 schedule_recv: mpsc::UnboundedReceiver<
98 ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>,
99 >,
100}
101
102#[must_use]
111pub fn new_service<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>() -> (
112 Service<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>,
113 ServiceHandle<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>,
114) {
115 let (schedule_sender, schedule_recv) = mpsc::unbounded_channel();
116 (Service { schedule_recv }, ServiceHandle { schedule_sender })
117}
118
119impl<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
120 Service<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
121{
122 pub async fn wait_for_request(
124 &mut self,
125 ) -> Option<ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>> {
126 self.schedule_recv.recv().await
127 }
128
129 pub fn blocking_wait_for_request(
131 &mut self,
132 ) -> Option<ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>> {
133 self.schedule_recv.blocking_recv()
134 }
135}
136
137impl<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
138 ScheduleRequest<ScheduleInput, ScheduleError, ScheduleOutput, TaskOutput>
139{
140 pub fn accept(self, data: ScheduleOutput) -> Option<Pending<TaskOutput>> {
145 let (output_sender, output_recv) = oneshot::channel();
146 if self
147 .sender
148 .send(Ok(ScheduledService {
149 data: Some(data),
150 output_recv,
151 }))
152 .is_err()
153 {
154 None
155 } else {
156 Some(Pending { output_sender })
157 }
158 }
159
160 pub fn reject(self, error: ScheduleError) {
162 let _ = self.sender.send(Err(error));
163 }
164
165 pub fn get_input(&self) -> Option<&ScheduleInput> {
167 self.input.as_ref()
168 }
169
170 pub fn get_mut_input(&mut self) -> Option<&mut ScheduleInput> {
172 self.input.as_mut()
173 }
174
175 pub fn take_input(&mut self) -> Option<ScheduleInput> {
179 self.input.take()
180 }
181}
182
183impl<TaskOutput> Pending<TaskOutput> {
184 pub fn finish(self, output: TaskOutput) {
189 let _ = self.output_sender.send(output);
190 }
191
192 #[must_use]
197 pub fn is_closed(&self) -> bool {
198 self.output_sender.is_closed()
199 }
200}
201
202impl<ScheduleOutput, TaskOutput> ScheduledService<ScheduleOutput, TaskOutput> {
203 pub fn get_data(&self) -> Option<&ScheduleOutput> {
205 self.data.as_ref()
206 }
207
208 pub fn get_mut_data(&mut self) -> Option<&mut ScheduleOutput> {
210 self.data.as_mut()
211 }
212
213 pub fn take_data(&mut self) -> Option<ScheduleOutput> {
217 self.data.take()
218 }
219
220 pub async fn wait_or_closed(self) -> Option<TaskOutput> {
222 self.output_recv.await.ok()
223 }
224
225 pub async fn wait(self) -> TaskOutput {
227 let Some(out) = self.wait_or_closed().await else {
228 std::future::pending::<()>().await;
229 unreachable!()
230 };
231 out
232 }
233}