1use anyhow::Context;
2use std::sync::Arc;
3use tokio::sync::mpsc;
4use tonic::transport::Channel;
5
6use crate::client::iotics::api::input_api_client::InputApiClient;
7use crate::client::iotics::api::{
8 delete_input_request, describe_input_request, receive_input_message_request,
9 DeleteInputResponse, DescribeInputResponse,
10};
11
12pub use crate::client::iotics::api::{
13 DeleteInputRequest, DescribeInputRequest, Headers, InputId, ReceiveInputMessageRequest,
14 ReceiveInputMessageResponse,
15};
16
17use crate::auth_builder::IntoAuthBuilder;
18use crate::channel::create_channel;
19use crate::helpers::generate_client_app_id;
20
21pub async fn receive_input_messages(
22 auth_builder: Arc<impl IntoAuthBuilder>,
23 twin_id: &str,
24 input_id: &str,
25) -> Result<mpsc::Receiver<Result<Vec<u8>, anyhow::Error>>, anyhow::Error> {
26 let channel = create_channel(auth_builder.clone(), None, None, None).await?;
27 receive_input_messages_with_channel(auth_builder, channel, twin_id, input_id).await
28}
29
30pub async fn receive_input_messages_with_channel(
31 auth_builder: Arc<impl IntoAuthBuilder>,
32 channel: Channel,
33 twin_id: &str,
34 input_id: &str,
35) -> Result<mpsc::Receiver<Result<Vec<u8>, anyhow::Error>>, anyhow::Error> {
36 let mut client = InputApiClient::new(channel);
37 let client_app_id = generate_client_app_id();
38 let transaction_ref = vec![client_app_id.clone()];
39
40 let headers = Headers {
41 client_app_id,
42 transaction_ref,
43 ..Default::default()
44 };
45
46 let mut request = tonic::Request::new(ReceiveInputMessageRequest {
47 headers: Some(headers),
48 args: Some(receive_input_message_request::Arguments {
49 input_id: Some(InputId {
50 id: input_id.to_string(),
51 twin_id: twin_id.to_string(),
52 ..Default::default()
53 }),
54 }),
55 });
56
57 let token = auth_builder.get_token()?;
58
59 request.metadata_mut().append(
60 "authorization",
61 token.parse().context("parse token failed")?,
62 );
63
64 let (tx, rx) = mpsc::channel::<Result<Vec<u8>, anyhow::Error>>(16384);
65
66 let fut = async move {
67 let stream = client.receive_input_messages(request).await;
68
69 match stream {
70 Ok(mut stream) => {
71 let stream = stream.get_mut();
72
73 while let Ok(Some(response)) = stream.message().await {
74 match response.payload {
75 Some(payload) => {
76 match payload.message {
77 Some(message) => {
78 if tx.send(Ok(message.data)).await.is_err() {
79 break;
80 }
81 }
82 None => {
83 if tx
84 .send(Err(anyhow::anyhow!("Empty input payload")))
85 .await
86 .is_err()
87 {
88 break;
89 }
90 }
91 };
92 }
93 None => {
94 if tx
95 .send(Err(anyhow::anyhow!("Empty input response")))
96 .await
97 .is_err()
98 {
99 break;
100 }
101 }
102 };
103 }
104 }
105 Err(e) => {
106 let _ = tx.send(Err(e.into())).await;
107 }
108 }
109 };
110
111 tokio::spawn(fut);
112 Ok(rx)
113}
114
115pub async fn describe_input(
116 auth_builder: Arc<impl IntoAuthBuilder>,
117 twin_id: &str,
118 input_id: &str,
119 remote_host_id: Option<&str>,
120) -> Result<DescribeInputResponse, anyhow::Error> {
121 let channel = create_channel(auth_builder.clone(), None, None, None).await?;
122 describe_input_with_channel(auth_builder, channel, twin_id, input_id, remote_host_id).await
123}
124
125pub async fn describe_input_with_channel(
126 auth_builder: Arc<impl IntoAuthBuilder>,
127 channel: Channel,
128 twin_id: &str,
129 input_id: &str,
130 remote_host_id: Option<&str>,
131) -> Result<DescribeInputResponse, anyhow::Error> {
132 let mut client = InputApiClient::new(channel);
133 let client_app_id = generate_client_app_id();
134 let transaction_ref = vec![client_app_id.clone()];
135
136 let headers = Headers {
137 client_app_id,
138 transaction_ref: transaction_ref.clone(),
139 ..Default::default()
140 };
141
142 let mut request = tonic::Request::new(DescribeInputRequest {
143 headers: Some(headers),
144 args: Some(describe_input_request::Arguments {
145 input_id: Some(InputId {
146 id: input_id.to_string(),
147 twin_id: twin_id.to_string(),
148 host_id: remote_host_id.unwrap_or_default().to_string(),
149 }),
150 }),
151 });
152
153 let token = auth_builder.get_token()?;
154
155 request.metadata_mut().append(
156 "authorization",
157 token.parse().context("parse token failed")?,
158 );
159
160 let result = client.describe_input(request).await.with_context(|| {
161 format!(
162 "Describing input failed, transaction ref [{}]",
163 transaction_ref.join(", ")
164 )
165 })?;
166 let result = result.into_inner();
167
168 Ok(result)
169}
170
171pub async fn delete_input(
172 auth_builder: Arc<impl IntoAuthBuilder>,
173 twin_id: &str,
174 input_id: &str,
175) -> Result<DeleteInputResponse, anyhow::Error> {
176 let channel = create_channel(auth_builder.clone(), None, None, None).await?;
177 delete_input_with_client(auth_builder, channel, twin_id, input_id).await
178}
179
180pub async fn delete_input_with_client(
181 auth_builder: Arc<impl IntoAuthBuilder>,
182 channel: Channel,
183 twin_id: &str,
184 input_id: &str,
185) -> Result<DeleteInputResponse, anyhow::Error> {
186 let mut client = InputApiClient::new(channel);
187 let client_app_id = generate_client_app_id();
188 let transaction_ref = vec![client_app_id.clone()];
189
190 let headers = Headers {
191 client_app_id,
192 transaction_ref: transaction_ref.clone(),
193 ..Default::default()
194 };
195
196 let mut request = tonic::Request::new(DeleteInputRequest {
197 headers: Some(headers),
198 args: Some(delete_input_request::Arguments {
199 input_id: Some(InputId {
200 id: input_id.to_string(),
201 twin_id: twin_id.to_string(),
202 ..Default::default()
203 }),
204 }),
205 });
206
207 let token = auth_builder.get_token()?;
208
209 request.metadata_mut().append(
210 "authorization",
211 token.parse().context("parse token failed")?,
212 );
213
214 let result = client.delete_input(request).await.with_context(|| {
215 format!(
216 "Deleting input failed, transaction ref [{}]",
217 transaction_ref.join(", ")
218 )
219 })?;
220 let result = result.into_inner();
221
222 Ok(result)
223}