iotics_grpc_client/
input.rs

1use anyhow::Context;
2use std::sync::Arc;
3use tokio::sync::mpsc;
4use tonic::transport::Channel;
5
6use crate::client::iotics::api::input_api_client::InputApiClient;
7use crate::client::iotics::api::{
8    delete_input_request, describe_input_request, receive_input_message_request,
9    DeleteInputResponse, DescribeInputResponse,
10};
11
12pub use crate::client::iotics::api::{
13    DeleteInputRequest, DescribeInputRequest, Headers, InputId, ReceiveInputMessageRequest,
14    ReceiveInputMessageResponse,
15};
16
17use crate::auth_builder::IntoAuthBuilder;
18use crate::channel::create_channel;
19use crate::helpers::generate_client_app_id;
20
21pub async fn receive_input_messages(
22    auth_builder: Arc<impl IntoAuthBuilder>,
23    twin_id: &str,
24    input_id: &str,
25) -> Result<mpsc::Receiver<Result<Vec<u8>, anyhow::Error>>, anyhow::Error> {
26    let channel = create_channel(auth_builder.clone(), None, None, None).await?;
27    receive_input_messages_with_channel(auth_builder, channel, twin_id, input_id).await
28}
29
30pub async fn receive_input_messages_with_channel(
31    auth_builder: Arc<impl IntoAuthBuilder>,
32    channel: Channel,
33    twin_id: &str,
34    input_id: &str,
35) -> Result<mpsc::Receiver<Result<Vec<u8>, anyhow::Error>>, anyhow::Error> {
36    let mut client = InputApiClient::new(channel);
37    let client_app_id = generate_client_app_id();
38    let transaction_ref = vec![client_app_id.clone()];
39
40    let headers = Headers {
41        client_app_id,
42        transaction_ref,
43        ..Default::default()
44    };
45
46    let mut request = tonic::Request::new(ReceiveInputMessageRequest {
47        headers: Some(headers),
48        args: Some(receive_input_message_request::Arguments {
49            input_id: Some(InputId {
50                id: input_id.to_string(),
51                twin_id: twin_id.to_string(),
52                ..Default::default()
53            }),
54        }),
55    });
56
57    let token = auth_builder.get_token()?;
58
59    request.metadata_mut().append(
60        "authorization",
61        token.parse().context("parse token failed")?,
62    );
63
64    let (tx, rx) = mpsc::channel::<Result<Vec<u8>, anyhow::Error>>(16384);
65
66    let fut = async move {
67        let stream = client.receive_input_messages(request).await;
68
69        match stream {
70            Ok(mut stream) => {
71                let stream = stream.get_mut();
72
73                while let Ok(Some(response)) = stream.message().await {
74                    match response.payload {
75                        Some(payload) => {
76                            match payload.message {
77                                Some(message) => {
78                                    if tx.send(Ok(message.data)).await.is_err() {
79                                        break;
80                                    }
81                                }
82                                None => {
83                                    if tx
84                                        .send(Err(anyhow::anyhow!("Empty input payload")))
85                                        .await
86                                        .is_err()
87                                    {
88                                        break;
89                                    }
90                                }
91                            };
92                        }
93                        None => {
94                            if tx
95                                .send(Err(anyhow::anyhow!("Empty input response")))
96                                .await
97                                .is_err()
98                            {
99                                break;
100                            }
101                        }
102                    };
103                }
104            }
105            Err(e) => {
106                let _ = tx.send(Err(e.into())).await;
107            }
108        }
109    };
110
111    tokio::spawn(fut);
112    Ok(rx)
113}
114
115pub async fn describe_input(
116    auth_builder: Arc<impl IntoAuthBuilder>,
117    twin_id: &str,
118    input_id: &str,
119    remote_host_id: Option<&str>,
120) -> Result<DescribeInputResponse, anyhow::Error> {
121    let channel = create_channel(auth_builder.clone(), None, None, None).await?;
122    describe_input_with_channel(auth_builder, channel, twin_id, input_id, remote_host_id).await
123}
124
125pub async fn describe_input_with_channel(
126    auth_builder: Arc<impl IntoAuthBuilder>,
127    channel: Channel,
128    twin_id: &str,
129    input_id: &str,
130    remote_host_id: Option<&str>,
131) -> Result<DescribeInputResponse, anyhow::Error> {
132    let mut client = InputApiClient::new(channel);
133    let client_app_id = generate_client_app_id();
134    let transaction_ref = vec![client_app_id.clone()];
135
136    let headers = Headers {
137        client_app_id,
138        transaction_ref: transaction_ref.clone(),
139        ..Default::default()
140    };
141
142    let mut request = tonic::Request::new(DescribeInputRequest {
143        headers: Some(headers),
144        args: Some(describe_input_request::Arguments {
145            input_id: Some(InputId {
146                id: input_id.to_string(),
147                twin_id: twin_id.to_string(),
148                host_id: remote_host_id.unwrap_or_default().to_string(),
149            }),
150        }),
151    });
152
153    let token = auth_builder.get_token()?;
154
155    request.metadata_mut().append(
156        "authorization",
157        token.parse().context("parse token failed")?,
158    );
159
160    let result = client.describe_input(request).await.with_context(|| {
161        format!(
162            "Describing input failed, transaction ref [{}]",
163            transaction_ref.join(", ")
164        )
165    })?;
166    let result = result.into_inner();
167
168    Ok(result)
169}
170
171pub async fn delete_input(
172    auth_builder: Arc<impl IntoAuthBuilder>,
173    twin_id: &str,
174    input_id: &str,
175) -> Result<DeleteInputResponse, anyhow::Error> {
176    let channel = create_channel(auth_builder.clone(), None, None, None).await?;
177    delete_input_with_client(auth_builder, channel, twin_id, input_id).await
178}
179
180pub async fn delete_input_with_client(
181    auth_builder: Arc<impl IntoAuthBuilder>,
182    channel: Channel,
183    twin_id: &str,
184    input_id: &str,
185) -> Result<DeleteInputResponse, anyhow::Error> {
186    let mut client = InputApiClient::new(channel);
187    let client_app_id = generate_client_app_id();
188    let transaction_ref = vec![client_app_id.clone()];
189
190    let headers = Headers {
191        client_app_id,
192        transaction_ref: transaction_ref.clone(),
193        ..Default::default()
194    };
195
196    let mut request = tonic::Request::new(DeleteInputRequest {
197        headers: Some(headers),
198        args: Some(delete_input_request::Arguments {
199            input_id: Some(InputId {
200                id: input_id.to_string(),
201                twin_id: twin_id.to_string(),
202                ..Default::default()
203            }),
204        }),
205    });
206
207    let token = auth_builder.get_token()?;
208
209    request.metadata_mut().append(
210        "authorization",
211        token.parse().context("parse token failed")?,
212    );
213
214    let result = client.delete_input(request).await.with_context(|| {
215        format!(
216            "Deleting input failed, transaction ref [{}]",
217            transaction_ref.join(", ")
218        )
219    })?;
220    let result = result.into_inner();
221
222    Ok(result)
223}