decthings_api/client/rpc/spawned/
mod.rs1mod request;
2mod response;
3
4use crate::client::StateModification;
5
6pub use request::*;
7pub use response::*;
8
9pub struct SpawnedRpc {
10 rpc: crate::client::DecthingsClientRpc,
11}
12
13impl SpawnedRpc {
14 pub(crate) fn new(rpc: crate::client::DecthingsClientRpc) -> Self {
15 Self { rpc }
16 }
17
18 pub async fn spawn_command(
19 &self,
20 params: SpawnCommandParams<'_, impl AsRef<str>>,
21 ) -> Result<SpawnCommandResult, crate::client::DecthingsRpcError<SpawnCommandError>> {
22 #[cfg(feature = "events")]
23 let subscribe_to_events = params.subscribe_to_events != Some(false);
24
25 #[cfg(feature = "events")]
26 let protocol = if subscribe_to_events {
27 crate::client::RpcProtocol::Ws
28 } else {
29 crate::client::RpcProtocol::Http
30 };
31
32 #[cfg(not(feature = "events"))]
33 let protocol = crate::client::RpcProtocol::Http;
34
35 let (tx, rx) = tokio::sync::oneshot::channel();
36 self.rpc
37 .raw_method_call::<_, _, &[u8]>(
38 "Spawned",
39 "spawnCommand",
40 params,
41 &[],
42 protocol,
43 move |x| {
44 match x {
45 Ok(val) => {
46 let res: Result<
47 super::Response<SpawnCommandResult, SpawnCommandError>,
48 crate::client::DecthingsRpcError<SpawnCommandError>,
49 > = serde_json::from_slice(&val.0).map_err(Into::into);
50 match res {
51 Ok(super::Response::Result(val)) => {
52 #[cfg(feature = "events")]
53 let spawned_command_id = val.spawned_command_id.clone();
54
55 tx.send(Ok(val)).ok();
56
57 #[cfg(feature = "events")]
58 if subscribe_to_events {
59 return StateModification {
60 add_events: vec![spawned_command_id],
61 remove_events: vec![],
62 };
63 }
64 }
65 Ok(super::Response::Error(val)) => {
66 tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
67 .ok();
68 }
69 Err(e) => {
70 tx.send(Err(e)).ok();
71 }
72 }
73 }
74 Err(err) => {
75 tx.send(Err(err.into())).ok();
76 }
77 }
78 StateModification::empty()
79 },
80 )
81 .await;
82 rx.await.unwrap()
83 }
84
85 pub async fn spawn_command_for_model(
86 &self,
87 params: SpawnCommandForModelParams<'_, impl AsRef<str>>,
88 ) -> Result<
89 SpawnCommandForModelResult,
90 crate::client::DecthingsRpcError<SpawnCommandForModelError>,
91 > {
92 #[cfg(feature = "events")]
93 let subscribe_to_events = params.subscribe_to_events != Some(false);
94
95 #[cfg(feature = "events")]
96 let protocol = if subscribe_to_events {
97 crate::client::RpcProtocol::Ws
98 } else {
99 crate::client::RpcProtocol::Http
100 };
101
102 #[cfg(not(feature = "events"))]
103 let protocol = crate::client::RpcProtocol::Http;
104
105 let (tx, rx) = tokio::sync::oneshot::channel();
106 self.rpc
107 .raw_method_call::<_, _, &[u8]>(
108 "Spawned",
109 "spawnCommandForModel",
110 params,
111 &[],
112 protocol,
113 move |x| {
114 match x {
115 Ok(val) => {
116 let res: Result<
117 super::Response<
118 SpawnCommandForModelResult,
119 SpawnCommandForModelError,
120 >,
121 crate::client::DecthingsRpcError<SpawnCommandForModelError>,
122 > = serde_json::from_slice(&val.0).map_err(Into::into);
123 match res {
124 Ok(super::Response::Result(val)) => {
125 #[cfg(feature = "events")]
126 let spawned_command_id = val.spawned_command_id.clone();
127
128 tx.send(Ok(val)).ok();
129
130 #[cfg(feature = "events")]
131 if subscribe_to_events {
132 return StateModification {
133 add_events: vec![spawned_command_id],
134 remove_events: vec![],
135 };
136 }
137 }
138 Ok(super::Response::Error(val)) => {
139 tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
140 .ok();
141 }
142 Err(e) => {
143 tx.send(Err(e)).ok();
144 }
145 }
146 }
147 Err(err) => {
148 tx.send(Err(err.into())).ok();
149 }
150 }
151 StateModification::empty()
152 },
153 )
154 .await;
155 rx.await.unwrap()
156 }
157
158 pub async fn terminate_spawned_command(
159 &self,
160 params: TerminateSpawnedCommandParams<'_>,
161 ) -> Result<
162 TerminateSpawnedCommandResult,
163 crate::client::DecthingsRpcError<TerminateSpawnedCommandError>,
164 > {
165 #[cfg(feature = "events")]
166 let spawned_command_id_owned = params.spawned_command_id.to_owned();
167
168 let (tx, rx) = tokio::sync::oneshot::channel();
169 self.rpc
170 .raw_method_call::<_, _, &[u8]>(
171 "Spawned",
172 "terminateSpawnedCommand",
173 params,
174 &[],
175 crate::client::RpcProtocol::Http,
176 move |x| {
177 match x {
178 Ok(val) => {
179 let res: Result<
180 super::Response<
181 TerminateSpawnedCommandResult,
182 TerminateSpawnedCommandError,
183 >,
184 crate::client::DecthingsRpcError<TerminateSpawnedCommandError>,
185 > = serde_json::from_slice(&val.0).map_err(Into::into);
186 match res {
187 Ok(super::Response::Result(val)) => {
188 tx.send(Ok(val)).ok();
189
190 #[cfg(feature = "events")]
191 return StateModification {
192 add_events: vec![],
193 remove_events: vec![spawned_command_id_owned],
194 };
195 }
196 Ok(super::Response::Error(val)) => {
197 tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
198 .ok();
199 }
200 Err(e) => {
201 tx.send(Err(e)).ok();
202 }
203 }
204 }
205 Err(err) => {
206 tx.send(Err(err.into())).ok();
207 }
208 }
209 StateModification::empty()
210 },
211 )
212 .await;
213 rx.await.unwrap()
214 }
215
216 pub async fn get_spawned_commands(
217 &self,
218 params: GetSpawnedCommandsParams<'_, impl AsRef<str>>,
219 ) -> Result<GetSpawnedCommandsResult, crate::client::DecthingsRpcError<GetSpawnedCommandsError>>
220 {
221 let (tx, rx) = tokio::sync::oneshot::channel();
222 self.rpc
223 .raw_method_call::<_, _, &[u8]>(
224 "Spawned",
225 "getSpawnedCommands",
226 params,
227 &[],
228 crate::client::RpcProtocol::Http,
229 |x| {
230 tx.send(x).ok();
231 StateModification::empty()
232 },
233 )
234 .await;
235 rx.await
236 .unwrap()
237 .map_err(crate::client::DecthingsRpcError::Request)
238 .and_then(|x| {
239 let res: super::Response<GetSpawnedCommandsResult, GetSpawnedCommandsError> =
240 serde_json::from_slice(&x.0)?;
241 match res {
242 super::Response::Result(val) => Ok(val),
243 super::Response::Error(val) => Err(crate::client::DecthingsRpcError::Rpc(val)),
244 }
245 })
246 }
247
248 pub async fn write_to_spawned_command(
249 &self,
250 params: WriteToSpawnedCommandParams<'_, impl AsRef<[u8]>>,
251 ) -> Result<
252 WriteToSpawnedCommandResult,
253 crate::client::DecthingsRpcError<WriteToSpawnedCommandError>,
254 > {
255 let (tx, rx) = tokio::sync::oneshot::channel();
256 self.rpc
257 .raw_method_call(
258 "Spawned",
259 "writeToSpawnedCommand",
260 ¶ms,
261 &[¶ms.data],
262 crate::client::RpcProtocol::Http,
263 |x| {
264 tx.send(x).ok();
265 StateModification::empty()
266 },
267 )
268 .await;
269 rx.await
270 .unwrap()
271 .map_err(crate::client::DecthingsRpcError::Request)
272 .and_then(|x| {
273 let res: super::Response<WriteToSpawnedCommandResult, WriteToSpawnedCommandError> =
274 serde_json::from_slice(&x.0)?;
275 match res {
276 super::Response::Result(val) => Ok(val),
277 super::Response::Error(val) => Err(crate::client::DecthingsRpcError::Rpc(val)),
278 }
279 })
280 }
281
282 #[cfg(feature = "events")]
283 pub async fn subscribe_to_events(
284 &self,
285 params: SpawnedSubscribeToEventsParams<'_>,
286 ) -> Result<
287 SpawnedSubscribeToEventsResult,
288 crate::client::DecthingsRpcError<SpawnedSubscribeToEventsError>,
289 > {
290 let (tx, rx) = tokio::sync::oneshot::channel();
291 let spawned_command_id_owned = params.spawned_command_id.to_owned();
292 self.rpc
293 .raw_method_call::<_, _, &[u8]>(
294 "Spawned",
295 "subscribeToEvents",
296 params,
297 &[],
298 crate::client::RpcProtocol::Ws,
299 move |x| {
300 match x {
301 Ok(val) => {
302 let res: Result<
303 super::Response<
304 SpawnedSubscribeToEventsResult,
305 SpawnedSubscribeToEventsError,
306 >,
307 crate::client::DecthingsRpcError<SpawnedSubscribeToEventsError>,
308 > = serde_json::from_slice(&val.0).map_err(Into::into);
309 match res {
310 Ok(super::Response::Result(val)) => {
311 tx.send(Ok(val)).ok();
312 return StateModification {
313 add_events: vec![spawned_command_id_owned],
314 remove_events: vec![],
315 };
316 }
317 Ok(super::Response::Error(val)) => {
318 tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
319 .ok();
320 }
321 Err(e) => {
322 tx.send(Err(e)).ok();
323 }
324 }
325 }
326 Err(err) => {
327 tx.send(Err(err.into())).ok();
328 }
329 }
330 StateModification::empty()
331 },
332 )
333 .await;
334 rx.await.unwrap()
335 }
336
337 #[cfg(feature = "events")]
338 pub async fn unsubscribe_from_events(
339 &self,
340 params: SpawnedUnsubscribeFromEventsParams<'_>,
341 ) -> Result<
342 SpawnedUnsubscribeFromEventsResult,
343 crate::client::DecthingsRpcError<SpawnedUnsubscribeFromEventsError>,
344 > {
345 let (tx, rx) = tokio::sync::oneshot::channel();
346 let spawned_command_id_owned = params.spawned_command_id.to_owned();
347 let did_call = self
348 .rpc
349 .raw_method_call::<_, _, &[u8]>(
350 "Spawned",
351 "unsubscribeFromEvents",
352 params,
353 &[],
354 crate::client::RpcProtocol::WsIfAvailableOtherwiseNone,
355 move |x| {
356 match x {
357 Ok(val) => {
358 let res: Result<
359 super::Response<
360 SpawnedUnsubscribeFromEventsResult,
361 SpawnedUnsubscribeFromEventsError,
362 >,
363 crate::client::DecthingsRpcError<SpawnedUnsubscribeFromEventsError>,
364 > = serde_json::from_slice(&val.0).map_err(Into::into);
365 match res {
366 Ok(super::Response::Result(val)) => {
367 tx.send(Ok(val)).ok();
368 return StateModification {
369 add_events: vec![],
370 remove_events: vec![spawned_command_id_owned],
371 };
372 }
373 Ok(super::Response::Error(val)) => {
374 tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
375 .ok();
376 }
377 Err(e) => {
378 tx.send(Err(e)).ok();
379 }
380 }
381 }
382 Err(err) => {
383 tx.send(Err(err.into())).ok();
384 }
385 }
386 StateModification::empty()
387 },
388 )
389 .await;
390 if !did_call {
391 return Err(crate::client::DecthingsRpcError::Rpc(
392 SpawnedUnsubscribeFromEventsError::NotSubscribed,
393 ));
394 }
395 rx.await.unwrap()
396 }
397}