decthings_api/client/rpc/spawned/
mod.rs

1mod request;
2mod response;
3
4use crate::client::StateModification;
5
6pub use request::*;
7pub use response::*;
8
9pub struct SpawnedRpc {
10    rpc: crate::client::DecthingsClientRpc,
11}
12
13impl SpawnedRpc {
14    pub(crate) fn new(rpc: crate::client::DecthingsClientRpc) -> Self {
15        Self { rpc }
16    }
17
18    pub async fn spawn_command(
19        &self,
20        params: SpawnCommandParams<'_, impl AsRef<str>>,
21    ) -> Result<SpawnCommandResult, crate::client::DecthingsRpcError<SpawnCommandError>> {
22        #[cfg(feature = "events")]
23        let subscribe_to_events = params.subscribe_to_events != Some(false);
24
25        #[cfg(feature = "events")]
26        let protocol = if subscribe_to_events {
27            crate::client::RpcProtocol::Ws
28        } else {
29            crate::client::RpcProtocol::Http
30        };
31
32        #[cfg(not(feature = "events"))]
33        let protocol = crate::client::RpcProtocol::Http;
34
35        let (tx, rx) = tokio::sync::oneshot::channel();
36        self.rpc
37            .raw_method_call::<_, _, &[u8]>(
38                "Spawned",
39                "spawnCommand",
40                params,
41                &[],
42                protocol,
43                move |x| {
44                    match x {
45                        Ok(val) => {
46                            let res: Result<
47                                super::Response<SpawnCommandResult, SpawnCommandError>,
48                                crate::client::DecthingsRpcError<SpawnCommandError>,
49                            > = serde_json::from_slice(&val.0).map_err(Into::into);
50                            match res {
51                                Ok(super::Response::Result(val)) => {
52                                    #[cfg(feature = "events")]
53                                    let spawned_command_id = val.spawned_command_id.clone();
54
55                                    tx.send(Ok(val)).ok();
56
57                                    #[cfg(feature = "events")]
58                                    if subscribe_to_events {
59                                        return StateModification {
60                                            add_events: vec![spawned_command_id],
61                                            remove_events: vec![],
62                                        };
63                                    }
64                                }
65                                Ok(super::Response::Error(val)) => {
66                                    tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
67                                        .ok();
68                                }
69                                Err(e) => {
70                                    tx.send(Err(e)).ok();
71                                }
72                            }
73                        }
74                        Err(err) => {
75                            tx.send(Err(err.into())).ok();
76                        }
77                    }
78                    StateModification::empty()
79                },
80            )
81            .await;
82        rx.await.unwrap()
83    }
84
85    pub async fn spawn_command_for_model(
86        &self,
87        params: SpawnCommandForModelParams<'_, impl AsRef<str>>,
88    ) -> Result<
89        SpawnCommandForModelResult,
90        crate::client::DecthingsRpcError<SpawnCommandForModelError>,
91    > {
92        #[cfg(feature = "events")]
93        let subscribe_to_events = params.subscribe_to_events != Some(false);
94
95        #[cfg(feature = "events")]
96        let protocol = if subscribe_to_events {
97            crate::client::RpcProtocol::Ws
98        } else {
99            crate::client::RpcProtocol::Http
100        };
101
102        #[cfg(not(feature = "events"))]
103        let protocol = crate::client::RpcProtocol::Http;
104
105        let (tx, rx) = tokio::sync::oneshot::channel();
106        self.rpc
107            .raw_method_call::<_, _, &[u8]>(
108                "Spawned",
109                "spawnCommandForModel",
110                params,
111                &[],
112                protocol,
113                move |x| {
114                    match x {
115                        Ok(val) => {
116                            let res: Result<
117                                super::Response<
118                                    SpawnCommandForModelResult,
119                                    SpawnCommandForModelError,
120                                >,
121                                crate::client::DecthingsRpcError<SpawnCommandForModelError>,
122                            > = serde_json::from_slice(&val.0).map_err(Into::into);
123                            match res {
124                                Ok(super::Response::Result(val)) => {
125                                    #[cfg(feature = "events")]
126                                    let spawned_command_id = val.spawned_command_id.clone();
127
128                                    tx.send(Ok(val)).ok();
129
130                                    #[cfg(feature = "events")]
131                                    if subscribe_to_events {
132                                        return StateModification {
133                                            add_events: vec![spawned_command_id],
134                                            remove_events: vec![],
135                                        };
136                                    }
137                                }
138                                Ok(super::Response::Error(val)) => {
139                                    tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
140                                        .ok();
141                                }
142                                Err(e) => {
143                                    tx.send(Err(e)).ok();
144                                }
145                            }
146                        }
147                        Err(err) => {
148                            tx.send(Err(err.into())).ok();
149                        }
150                    }
151                    StateModification::empty()
152                },
153            )
154            .await;
155        rx.await.unwrap()
156    }
157
158    pub async fn terminate_spawned_command(
159        &self,
160        params: TerminateSpawnedCommandParams<'_>,
161    ) -> Result<
162        TerminateSpawnedCommandResult,
163        crate::client::DecthingsRpcError<TerminateSpawnedCommandError>,
164    > {
165        #[cfg(feature = "events")]
166        let spawned_command_id_owned = params.spawned_command_id.to_owned();
167
168        let (tx, rx) = tokio::sync::oneshot::channel();
169        self.rpc
170            .raw_method_call::<_, _, &[u8]>(
171                "Spawned",
172                "terminateSpawnedCommand",
173                params,
174                &[],
175                crate::client::RpcProtocol::Http,
176                move |x| {
177                    match x {
178                        Ok(val) => {
179                            let res: Result<
180                                super::Response<
181                                    TerminateSpawnedCommandResult,
182                                    TerminateSpawnedCommandError,
183                                >,
184                                crate::client::DecthingsRpcError<TerminateSpawnedCommandError>,
185                            > = serde_json::from_slice(&val.0).map_err(Into::into);
186                            match res {
187                                Ok(super::Response::Result(val)) => {
188                                    tx.send(Ok(val)).ok();
189
190                                    #[cfg(feature = "events")]
191                                    return StateModification {
192                                        add_events: vec![],
193                                        remove_events: vec![spawned_command_id_owned],
194                                    };
195                                }
196                                Ok(super::Response::Error(val)) => {
197                                    tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
198                                        .ok();
199                                }
200                                Err(e) => {
201                                    tx.send(Err(e)).ok();
202                                }
203                            }
204                        }
205                        Err(err) => {
206                            tx.send(Err(err.into())).ok();
207                        }
208                    }
209                    StateModification::empty()
210                },
211            )
212            .await;
213        rx.await.unwrap()
214    }
215
216    pub async fn get_spawned_commands(
217        &self,
218        params: GetSpawnedCommandsParams<'_, impl AsRef<str>>,
219    ) -> Result<GetSpawnedCommandsResult, crate::client::DecthingsRpcError<GetSpawnedCommandsError>>
220    {
221        let (tx, rx) = tokio::sync::oneshot::channel();
222        self.rpc
223            .raw_method_call::<_, _, &[u8]>(
224                "Spawned",
225                "getSpawnedCommands",
226                params,
227                &[],
228                crate::client::RpcProtocol::Http,
229                |x| {
230                    tx.send(x).ok();
231                    StateModification::empty()
232                },
233            )
234            .await;
235        rx.await
236            .unwrap()
237            .map_err(crate::client::DecthingsRpcError::Request)
238            .and_then(|x| {
239                let res: super::Response<GetSpawnedCommandsResult, GetSpawnedCommandsError> =
240                    serde_json::from_slice(&x.0)?;
241                match res {
242                    super::Response::Result(val) => Ok(val),
243                    super::Response::Error(val) => Err(crate::client::DecthingsRpcError::Rpc(val)),
244                }
245            })
246    }
247
248    pub async fn write_to_spawned_command(
249        &self,
250        params: WriteToSpawnedCommandParams<'_, impl AsRef<[u8]>>,
251    ) -> Result<
252        WriteToSpawnedCommandResult,
253        crate::client::DecthingsRpcError<WriteToSpawnedCommandError>,
254    > {
255        let (tx, rx) = tokio::sync::oneshot::channel();
256        self.rpc
257            .raw_method_call(
258                "Spawned",
259                "writeToSpawnedCommand",
260                &params,
261                &[&params.data],
262                crate::client::RpcProtocol::Http,
263                |x| {
264                    tx.send(x).ok();
265                    StateModification::empty()
266                },
267            )
268            .await;
269        rx.await
270            .unwrap()
271            .map_err(crate::client::DecthingsRpcError::Request)
272            .and_then(|x| {
273                let res: super::Response<WriteToSpawnedCommandResult, WriteToSpawnedCommandError> =
274                    serde_json::from_slice(&x.0)?;
275                match res {
276                    super::Response::Result(val) => Ok(val),
277                    super::Response::Error(val) => Err(crate::client::DecthingsRpcError::Rpc(val)),
278                }
279            })
280    }
281
282    #[cfg(feature = "events")]
283    pub async fn subscribe_to_events(
284        &self,
285        params: SpawnedSubscribeToEventsParams<'_>,
286    ) -> Result<
287        SpawnedSubscribeToEventsResult,
288        crate::client::DecthingsRpcError<SpawnedSubscribeToEventsError>,
289    > {
290        let (tx, rx) = tokio::sync::oneshot::channel();
291        let spawned_command_id_owned = params.spawned_command_id.to_owned();
292        self.rpc
293            .raw_method_call::<_, _, &[u8]>(
294                "Spawned",
295                "subscribeToEvents",
296                params,
297                &[],
298                crate::client::RpcProtocol::Ws,
299                move |x| {
300                    match x {
301                        Ok(val) => {
302                            let res: Result<
303                                super::Response<
304                                    SpawnedSubscribeToEventsResult,
305                                    SpawnedSubscribeToEventsError,
306                                >,
307                                crate::client::DecthingsRpcError<SpawnedSubscribeToEventsError>,
308                            > = serde_json::from_slice(&val.0).map_err(Into::into);
309                            match res {
310                                Ok(super::Response::Result(val)) => {
311                                    tx.send(Ok(val)).ok();
312                                    return StateModification {
313                                        add_events: vec![spawned_command_id_owned],
314                                        remove_events: vec![],
315                                    };
316                                }
317                                Ok(super::Response::Error(val)) => {
318                                    tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
319                                        .ok();
320                                }
321                                Err(e) => {
322                                    tx.send(Err(e)).ok();
323                                }
324                            }
325                        }
326                        Err(err) => {
327                            tx.send(Err(err.into())).ok();
328                        }
329                    }
330                    StateModification::empty()
331                },
332            )
333            .await;
334        rx.await.unwrap()
335    }
336
337    #[cfg(feature = "events")]
338    pub async fn unsubscribe_from_events(
339        &self,
340        params: SpawnedUnsubscribeFromEventsParams<'_>,
341    ) -> Result<
342        SpawnedUnsubscribeFromEventsResult,
343        crate::client::DecthingsRpcError<SpawnedUnsubscribeFromEventsError>,
344    > {
345        let (tx, rx) = tokio::sync::oneshot::channel();
346        let spawned_command_id_owned = params.spawned_command_id.to_owned();
347        let did_call = self
348            .rpc
349            .raw_method_call::<_, _, &[u8]>(
350                "Spawned",
351                "unsubscribeFromEvents",
352                params,
353                &[],
354                crate::client::RpcProtocol::WsIfAvailableOtherwiseNone,
355                move |x| {
356                    match x {
357                        Ok(val) => {
358                            let res: Result<
359                                super::Response<
360                                    SpawnedUnsubscribeFromEventsResult,
361                                    SpawnedUnsubscribeFromEventsError,
362                                >,
363                                crate::client::DecthingsRpcError<SpawnedUnsubscribeFromEventsError>,
364                            > = serde_json::from_slice(&val.0).map_err(Into::into);
365                            match res {
366                                Ok(super::Response::Result(val)) => {
367                                    tx.send(Ok(val)).ok();
368                                    return StateModification {
369                                        add_events: vec![],
370                                        remove_events: vec![spawned_command_id_owned],
371                                    };
372                                }
373                                Ok(super::Response::Error(val)) => {
374                                    tx.send(Err(crate::client::DecthingsRpcError::Rpc(val)))
375                                        .ok();
376                                }
377                                Err(e) => {
378                                    tx.send(Err(e)).ok();
379                                }
380                            }
381                        }
382                        Err(err) => {
383                            tx.send(Err(err.into())).ok();
384                        }
385                    }
386                    StateModification::empty()
387                },
388            )
389            .await;
390        if !did_call {
391            return Err(crate::client::DecthingsRpcError::Rpc(
392                SpawnedUnsubscribeFromEventsError::NotSubscribed,
393            ));
394        }
395        rx.await.unwrap()
396    }
397}