openiap_client/grpc.rs
1// use std::ops::AddAssign;
2use tracing::{trace, error, debug};
3
4use openiap_proto::errors::OpenIAPError;
5use tonic::Request;
6use tokio_stream::wrappers::ReceiverStream;
7use futures::StreamExt;
8use crate::{Client, ClientEnum, ClientState};
9use tokio::sync::mpsc;
10use tokio::time::{timeout, Duration};
11use tonic::transport::Channel;
12pub use openiap_proto::*;
13pub use protos::flow_service_client::FlowServiceClient;
14impl Client {
15 /// Connect to the server using gRPC protocol.
16 pub async fn connect_grpc(url: String) -> Result<FlowServiceClient<Channel>, Box<dyn std::error::Error>> {
17 // let response = FlowServiceClient::connect(strurl.clone()).await;
18 // let channel = Channel::from_shared(url)?
19 // .keep_alive_timeout(Duration::from_secs(10))
20 // .keep_alive_while_idle(true)
21 // // .timeout(Duration::from_secs(60 * 24 * 60))
22 // .http2_keep_alive_interval(Duration::from_secs(10))
23 // .connect_timeout(Duration::from_secs(60))
24 // .connect()
25 // .await?;
26 // let client = FlowServiceClient::new(channel);
27 let client = FlowServiceClient::connect(url).await?;
28 Ok(client)
29 }
30 /// internal function, used to setup gRPC stream used for communication with the server.
31 /// This function is called by [connect] and should not be called directly.
32 /// It will "pre" process stream, watch and queue events, and call future promises, when a response is received.
33 #[tracing::instrument(skip_all)]
34 pub async fn setup_grpc_stream(&self) -> Result<(), OpenIAPError> {
35 self.set_connected(ClientState::Connecting, None);
36 let mut client = match self.get_client() {
37 ClientEnum::Grpc(ref client) => client.clone(),
38 _ => {
39 return Err(OpenIAPError::ClientError("Invalid client".to_string()));
40 }
41 };
42 let (_new_stream_tx, stream_rx) = mpsc::channel(60);
43 let in_stream = ReceiverStream::new(stream_rx);
44
45 let response = client.setup_stream(Request::new(in_stream)).await;
46 let response = match response {
47 Ok(response) => response,
48 Err(e) => {
49 return Err(OpenIAPError::ClientError(format!(
50 "Failed to setup stream: {}",
51 e
52 )));
53 }
54 };
55
56 self.set_msgcount(-1); // Reset message count
57
58 let envelope_receiver = self.out_envelope_receiver.clone();
59 let me = self.clone();
60 // let sender = tokio::task::Builder::new().name("GRPC envelope sender").spawn(async move {
61 let sender = tokio::task::spawn(async move {
62 loop {
63 let envelope = envelope_receiver.recv().await;
64 let mut envelope = match envelope {
65 Ok(envelope) => envelope,
66 Err(e) => {
67 error!("Failed to receive message from envelope receiver: {:?}", e);
68 me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
69 return;
70 }
71 };
72 envelope.seq = me.inc_msgcount();
73 if envelope.id.is_empty() {
74 envelope.id = envelope.seq.to_string();
75 }
76 let command = envelope.command.clone();
77 if envelope.rid.is_empty() {
78 debug!("Send #{} #{} {} message", envelope.seq, envelope.id, command);
79 } else {
80 debug!("Send #{} #{} (reply to #{}) {} message", envelope.seq, envelope.id, envelope.rid, command);
81 }
82
83 // trace!("Begin sending a {} message to the server", command);
84 match _new_stream_tx.send(envelope).await {
85 Ok(_) => {
86 trace!("Successfully sent a {} message to the server", command);
87 },
88 Err(e) => {
89 error!("Failed to send message to gRPC stream: {:?}", e);
90 me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
91 return;
92 }
93 };
94 }
95 }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn GRPC envelope sender task: {:?}", e)))?;
96 self.push_handle(sender);
97 let mut resp_stream = response.into_inner();
98 let me = self.clone();
99 // let reader = tokio::task::Builder::new().name("GRPC envelope receiver").spawn(async move {
100 let reader = tokio::task::spawn(async move {
101 loop {
102 let read = timeout(Duration::from_secs(5), resp_stream.next()).await;
103 match read {
104 Ok(data) => {
105 match data {
106 Some(received) => {
107 match received {
108 Ok(received) => {
109 me.parse_incomming_envelope(received).await;
110 }
111 Err(e) => {
112 // error!("Received error from stream: {:?}", e);
113 me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
114 break;
115 }
116 }
117 }
118 None => {
119 me.set_connected(ClientState::Disconnected, Some("Server closed the connection"));
120 break;
121 }
122 }
123 }
124 Err(_e) => {
125 // timeout elapsed
126 }
127 }
128 }
129 }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn GRPC envelope receiver task: {:?}", e)))?;
130 self.push_handle(reader);
131 Ok(())
132 }
133}