openiap_client/
grpc.rs

1// use std::ops::AddAssign;
2use tracing::{trace, error, debug};
3
4use openiap_proto::errors::OpenIAPError;
5use tonic::Request;
6use tokio_stream::wrappers::ReceiverStream;
7use futures::StreamExt;
8use crate::{Client, ClientEnum, ClientState};
9use tokio::sync::mpsc;
10use tokio::time::{timeout, Duration};
11use tonic::transport::Channel;
12pub use openiap_proto::*;
13pub use protos::flow_service_client::FlowServiceClient;
14impl Client {
15    /// Connect to the server using gRPC protocol.
16    pub async fn connect_grpc(url: String) -> Result<FlowServiceClient<Channel>, Box<dyn std::error::Error>> {
17        // let response = FlowServiceClient::connect(strurl.clone()).await;
18        // let channel = Channel::from_shared(url)?
19        //     .keep_alive_timeout(Duration::from_secs(10))
20        //     .keep_alive_while_idle(true)
21        //     // .timeout(Duration::from_secs(60 * 24 * 60))
22        //     .http2_keep_alive_interval(Duration::from_secs(10))
23        //     .connect_timeout(Duration::from_secs(60))
24        //     .connect()
25        //     .await?;
26        // let client = FlowServiceClient::new(channel);
27        let client = FlowServiceClient::connect(url).await?;
28        Ok(client)
29    }
30    /// internal function, used to setup gRPC stream used for communication with the server.
31    /// This function is called by [connect] and should not be called directly.
32    /// It will "pre" process stream, watch and queue events, and call future promises, when a response is received.
33    #[tracing::instrument(skip_all)]
34    pub async fn setup_grpc_stream(&self) -> Result<(), OpenIAPError> {
35        self.set_connected(ClientState::Connecting, None);
36        let mut client = match self.get_client() {
37            ClientEnum::Grpc(ref client) => client.clone(),
38            _ => {
39                return Err(OpenIAPError::ClientError("Invalid client".to_string()));
40            }
41        };
42        let (_new_stream_tx, stream_rx) = mpsc::channel(60);
43        let in_stream = ReceiverStream::new(stream_rx);
44
45        let response = client.setup_stream(Request::new(in_stream)).await;
46        let response = match response {
47            Ok(response) => response,
48            Err(e) => {
49                return Err(OpenIAPError::ClientError(format!(
50                    "Failed to setup stream: {}",
51                    e
52                )));
53            }
54        };
55
56        self.set_msgcount(-1); // Reset message count
57
58        let envelope_receiver = self.out_envelope_receiver.clone();
59        let me = self.clone();
60        // let sender = tokio::task::Builder::new().name("GRPC envelope sender").spawn(async move {
61        let sender = tokio::task::spawn(async move {
62            loop {
63                let envelope = envelope_receiver.recv().await;
64                let mut envelope = match envelope {
65                    Ok(envelope) => envelope,
66                    Err(e) => {
67                        error!("Failed to receive message from envelope receiver: {:?}", e);
68                        me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
69                        return;
70                    }
71                };
72                envelope.seq = me.inc_msgcount();
73                if envelope.id.is_empty() {
74                    envelope.id = envelope.seq.to_string();
75                }
76                let command = envelope.command.clone();
77                if envelope.rid.is_empty() {
78                    debug!("Send #{} #{} {} message", envelope.seq, envelope.id, command);
79                } else {
80                    debug!("Send #{} #{} (reply to #{}) {} message", envelope.seq, envelope.id, envelope.rid, command);
81                }
82
83                // trace!("Begin sending a {} message to the server", command);
84                match _new_stream_tx.send(envelope).await {
85                    Ok(_) => {
86                        trace!("Successfully sent a {} message to the server", command);
87                    },
88                    Err(e) => {
89                        error!("Failed to send message to gRPC stream: {:?}", e);
90                        me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
91                        return;
92                    }
93                };
94            }
95        }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn GRPC envelope sender task: {:?}", e)))?;
96        self.push_handle(sender);
97        let mut resp_stream = response.into_inner();
98        let me = self.clone();
99        // let reader = tokio::task::Builder::new().name("GRPC envelope receiver").spawn(async move {
100        let reader = tokio::task::spawn(async move {
101            loop {
102                let read = timeout(Duration::from_secs(5), resp_stream.next()).await;
103                match read {
104                    Ok(data) => {
105                        match  data {
106                            Some(received) => {
107                                match received {
108                                    Ok(received) => {
109                                        me.parse_incomming_envelope(received).await;
110                                    }
111                                    Err(e) => {
112                                        // error!("Received error from stream: {:?}", e);
113                                        me.set_connected(ClientState::Disconnected, Some(&e.to_string()));
114                                        break;
115                                    }                                        
116                                }
117                            }
118                            None => {
119                                me.set_connected(ClientState::Disconnected, Some("Server closed the connection"));
120                                break;
121                            }                                
122                        }
123                    }
124                    Err(_e) => {
125                        // timeout elapsed
126                    }                        
127                }
128            }
129        }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn GRPC envelope receiver task: {:?}", e)))?;
130        self.push_handle(reader);
131        Ok(())
132    }
133}