openiap_client/
lib.rs

1#![warn(missing_docs)]
2//! The `openiap.client` crate provides the [Client] struct and its methods.
3//! For now this is only the GRPC and WebSocket client, later we will add a web rest, named pipe and TCP client as well.
4//! Initialize a new client, by calling the [Client::new_connect] method.
5//! ```
6//! use openiap_client::{OpenIAPError, Client, QueryRequest};
7//! #[tokio::main]
8//! async fn main() -> Result<(), OpenIAPError> {
9//!     let client = Client::new_connect("").await?;
10//!     let q = client.query( QueryRequest::with_projection(
11//!         "entities",
12//!         "{}",
13//!         "{\"name\":1}"
14//!     )).await?;
15//!     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
16//!     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
17//!     for item in items {
18//!         println!("Item: {:?}", item);
19//!     }
20//!     Ok(())
21//! }
22//! ```
23
24pub use openiap_proto::errors::*;
25pub use openiap_proto::protos::*;
26pub use openiap_proto::*;
27pub use prost_types::Timestamp;
28pub use protos::flow_service_client::FlowServiceClient;
29use sqids::Sqids;
30
31use tokio::task::JoinHandle;
32use tokio_tungstenite::WebSocketStream;
33use tracing::{debug, error, info, trace};
34type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
35type Result<T, E = StdError> = ::std::result::Result<T, E>;
36use std::fs::File;
37use std::io::{Read, Write};
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Arc;
40use tokio::sync::Mutex;
41use tonic::transport::Channel;
42
43use tokio::sync::{mpsc, oneshot};
44
45use std::env;
46use std::time::Duration;
47
48#[cfg(feature = "otel")]
49mod otel;
50mod tests;
51mod ws;
52mod grpc;
53mod util;
54pub use crate::util::{set_otel_url, enable_tracing, disable_tracing};
55pub use crate::otel::{set_f64_observable_gauge, set_u64_observable_gauge, set_i64_observable_gauge, disable_observable_gauge};
56
57type QuerySender = oneshot::Sender<Envelope>;
58type StreamSender = mpsc::Sender<Vec<u8>>;
59type Sock = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
60use futures::StreamExt;
61use async_channel::unbounded;
62const VERSION: &str = "0.0.32";
63
64
65/// The `Client` struct provides the client for the OpenIAP service.
66/// Initialize a new client, by calling the [Client::new_connect] method.
67#[derive(Clone)]
68pub struct Client {
69    /// Ensure we use only call connect once, and then use re-connect instead.
70    connect_called: Arc<std::sync::Mutex<bool>>,
71    /// The tokio runtime.
72    runtime: Arc<std::sync::Mutex<Option<tokio::runtime::Runtime>>>,
73
74    /// Keep track of usage of the client
75    stats: Arc<std::sync::Mutex<ClientStatistics>>,
76
77    task_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
78    /// The inner client object
79    pub client: Arc<std::sync::Mutex<ClientEnum>>,
80    /// The signed in user.
81    user: Arc<std::sync::Mutex<Option<User>>>,
82    /// The inner client.
83    pub inner: Arc<Mutex<ClientInner>>,
84    /// The `Config` struct provides the configuration for the OpenIAP service we are connecting to.
85    pub config: Arc<std::sync::Mutex<Option<Config>>>,
86    /// Should client automatically reconnect, if disconnected?
87    pub auto_reconnect: Arc<std::sync::Mutex<bool>>,
88    /// URL used to connect to server, processed and without credentials
89    pub url: Arc<std::sync::Mutex<String>>,
90    /// Username used to connect to server
91    pub username: Arc<std::sync::Mutex<String>>,
92    /// Password used to connect to server
93    pub password: Arc<std::sync::Mutex<String>>,
94    /// JWT token used to connect to server
95    pub jwt: Arc<std::sync::Mutex<String>>,
96    service_name: Arc<std::sync::Mutex<String>>,
97    agent_name: Arc<std::sync::Mutex<String>>,
98    agent_version: Arc<std::sync::Mutex<String>>,
99    event_sender: async_channel::Sender<ClientEvent>,
100    event_receiver: async_channel::Receiver<ClientEvent>,
101    out_envelope_sender: async_channel::Sender<Envelope>,
102    out_envelope_receiver: async_channel::Receiver<Envelope>,
103    rpc_reply_queue: Arc<tokio::sync::Mutex<Option<String>>>,
104    rpc_callback: Arc<tokio::sync::Mutex<Option<QueueCallbackFn>>>,
105
106    /// The client connection state.
107    pub state: Arc<std::sync::Mutex<ClientState>>,
108    /// Inceasing message count, used as unique id for messages.
109    pub msgcount: Arc<std::sync::Mutex<i32>>,
110    /// Reconnect interval in milliseconds, this will slowly increase if we keep getting disconnected.
111    pub reconnect_ms: Arc<std::sync::Mutex<i32>>,
112    /// The default timeout for requests
113    pub default_timeout: Arc<std::sync::Mutex<tokio::time::Duration>>,
114}
115/// The `ClientStatistics` struct provides the statistics for usage of the client
116#[derive(Clone, Default)]
117pub struct ClientStatistics {
118    connection_attempts: u64,
119    connections: u64,
120    package_tx: u64,
121    package_rx: u64,
122    signin: u64,
123    download: u64,
124    getdocumentversion: u64,
125    customcommand: u64,
126    listcollections: u64,
127    createcollection: u64,
128    dropcollection: u64,
129    ensurecustomer: u64,
130    invokeopenrpa: u64,
131    registerqueue: u64,
132    registerexchange: u64,
133    unregisterqueue: u64,
134    watch: u64,
135    unwatch: u64,
136    queuemessage: u64,
137    pushworkitem: u64,
138    pushworkitems: u64,
139    popworkitem: u64,
140    updateworkitem: u64,
141    deleteworkitem: u64,
142    addworkitemqueue: u64,
143    updateworkitemqueue: u64,
144    deleteworkitemqueue: u64,
145    getindexes: u64,
146    createindex: u64,
147    dropindex: u64,
148    upload: u64,
149    query: u64,
150    count: u64,
151    distinct: u64,
152    aggregate: u64,
153    insertone: u64,
154    insertmany: u64,
155    insertorupdateone: u64,
156    insertorupdatemany: u64,
157    updateone: u64,
158    updatedocument: u64,
159    deleteone: u64,
160    deletemany: u64,
161}
162// type QueueCallbackFn = Box<dyn Fn(&Client, QueueEvent) -> Option<String> + Send + Sync>;
163use futures::future::BoxFuture;
164type QueueCallbackFn =
165    Arc<dyn Fn(Arc<Client>, QueueEvent) -> BoxFuture<'static, Option<String>> + Send + Sync>;
166
167// type ExchangeCallbackFn = Box<dyn Fn(&Client, QueueEvent) + Send + Sync>;
168/// The `ClientInner` struct provides the inner client for the OpenIAP service.
169#[derive(Clone)]
170pub struct ClientInner {
171    /// list of queries ( messages sent to server we are waiting on a response for )
172    pub queries: Arc<Mutex<std::collections::HashMap<String, QuerySender>>>,
173    /// Active streams the server (or client) has opened
174    pub streams: Arc<Mutex<std::collections::HashMap<String, StreamSender>>>,
175    /// List of active watches ( change streams )
176    #[allow(clippy::type_complexity)]
177    pub watches:
178        Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(WatchEvent) + Send + Sync>>>>,
179    /// List of active queues ( message queues / mqqt queues or exchanges )
180    #[allow(clippy::type_complexity)]
181    pub queues:
182        Arc<Mutex<std::collections::HashMap<String, QueueCallbackFn>>>,
183}
184/// Client enum, used to determine which client to use.
185#[derive(Clone, Debug)]
186pub enum ClientEnum {
187    /// Not set yet
188    None,
189    /// Used when client wants to connect using gRPC 
190    Grpc(FlowServiceClient<tonic::transport::Channel>),
191    /// Used when client wants to connect using websockets
192    WS(Arc<Mutex<Sock>>)
193}
194/// Client event enum, used to determine which event has occurred.
195#[derive(Debug, Clone, PartialEq)]
196pub enum ClientEvent {
197    /// The client has connected
198    Connecting,
199    /// The client has connected
200    Connected,
201    /// The client has disconnected
202    Disconnected(String),
203    /// The client has signed in
204    SignedIn,
205    // The client has signed out
206    // SignedOut,
207    // The client has received a message
208    // Message(Envelope),
209    // The client has received a ping event from the server
210    // Ping,
211    // The client has received a stream
212    // Stream(Vec<u8>),
213    // The client has received a watch event
214    // Watch(WatchEvent),
215    // The client has received a queue event
216    // Queue(QueueEvent),
217}
218/// Client event enum, used to determine which event has occurred.
219#[derive(Debug, Clone, PartialEq)]
220pub enum ClientState {
221    /// The client is disconnected
222    Disconnected,
223    /// The client connecting
224    Connecting,
225    /// The client is connected
226    Connected,
227    /// The client is signed in and connected
228    Signedin
229}
230/// The `Config` struct provides the configuration for the OpenIAP service we are connecting to.
231#[derive(Debug, Clone, serde::Deserialize)]
232#[allow(dead_code)]
233pub struct Config {
234    #[serde(default)]
235    wshost: String,
236    #[serde(default)]
237    wsurl: String,
238    #[serde(default)]
239    domain: String,
240    #[serde(default)]
241    auto_create_users: bool,
242    #[serde(default)]
243    namespace: String,
244    #[serde(default)]
245    agent_domain_schema: String,
246    #[serde(default)]
247    version: String,
248    #[serde(default)]
249    validate_emails: bool,
250    #[serde(default)]
251    forgot_pass_emails: bool,
252    #[serde(default)]
253    supports_watch: bool,
254    #[serde(default)]
255    amqp_enabled_exchange: bool,
256    #[serde(default)]
257    multi_tenant: bool,
258    #[serde(default)]
259    enable_entity_restriction: bool,
260    #[serde(default)]
261    enable_web_tours: bool,
262    #[serde(default)]
263    collections_with_text_index: Vec<String>,
264    #[serde(default)]
265    timeseries_collections: Vec<String>,
266    #[serde(default)]
267    ping_clients_interval: i32,
268    #[serde(default)]
269    validlicense: bool,
270    #[serde(default)]
271    forceddomains: Vec<String>,
272    #[serde(default)]
273    grafana_url: String,
274    #[serde(default)]
275    otel_metric_url: String,
276    #[serde(default)]
277    otel_trace_url: String,
278    #[serde(default)]
279    otel_log_url: String,
280    #[serde(default)]
281    enable_analytics: bool,
282}
283impl std::fmt::Debug for ClientInner {
284    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285        f.debug_struct("ClientInner")
286            // .field("client", &self.client)
287            .field("queries", &self.queries)
288            .field("streams", &self.streams)
289            .finish()
290    }
291}
292impl Default for Client {
293    fn default() -> Self {
294        Self::new()
295    }
296}
297
298impl Client {
299    /// Create a new client.
300    pub fn new() -> Self {
301        let (ces, cer) = unbounded::<ClientEvent>();
302        let (out_es, out_er) = unbounded::<Envelope>();
303        Self {
304            task_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
305            stats: Arc::new(std::sync::Mutex::new(ClientStatistics::default())),
306            user: Arc::new(std::sync::Mutex::new(None)),
307            client: Arc::new(std::sync::Mutex::new(ClientEnum::None)),
308            connect_called: Arc::new(std::sync::Mutex::new(false)),
309            runtime: Arc::new(std::sync::Mutex::new(None)),
310            msgcount: Arc::new(std::sync::Mutex::new(-1)),
311            reconnect_ms: Arc::new(std::sync::Mutex::new(1000)),
312            rpc_reply_queue: Arc::new(tokio::sync::Mutex::new(None)),
313            rpc_callback: Arc::new(tokio::sync::Mutex::new(None)),
314            inner: Arc::new(Mutex::new(ClientInner {
315                queries: Arc::new(Mutex::new(std::collections::HashMap::new())),
316                streams: Arc::new(Mutex::new(std::collections::HashMap::new())),
317                watches: Arc::new(Mutex::new(std::collections::HashMap::new())),
318                queues: Arc::new(Mutex::new(std::collections::HashMap::new())),
319            })),
320            config: Arc::new(std::sync::Mutex::new(None)),
321            auto_reconnect: Arc::new(std::sync::Mutex::new(true)),
322            url: Arc::new(std::sync::Mutex::new("".to_string())),
323            username: Arc::new(std::sync::Mutex::new("".to_string())),
324            password: Arc::new(std::sync::Mutex::new("".to_string())),
325            jwt: Arc::new(std::sync::Mutex::new("".to_string())),
326            service_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
327            agent_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
328            agent_version: Arc::new(std::sync::Mutex::new(VERSION.to_string())),
329            event_sender: ces,
330            event_receiver: cer,
331            out_envelope_sender: out_es,
332            out_envelope_receiver: out_er,
333            state: Arc::new(std::sync::Mutex::new(ClientState::Disconnected)),
334            default_timeout: Arc::new(std::sync::Mutex::new(Duration::from_secs(30))),
335        }
336    }
337    /// Connect the client to the OpenIAP server.
338    #[tracing::instrument(skip_all)]
339    pub fn connect(&self, dst: &str) -> Result<(), OpenIAPError> {
340        let rt = match tokio::runtime::Runtime::new() {
341            Ok(rt) => rt,
342            Err(e) => {
343                return Err(OpenIAPError::ClientError(format!(
344                    "Failed to create tokio runtime: {}",
345                    e
346                )));
347            }
348        };
349        self.set_runtime(Some(rt));
350        tokio::task::block_in_place(|| {
351            let handle = self.get_runtime_handle();
352            handle.block_on(self.connect_async(dst))
353        })
354    }
355
356    /// Load the configuration from the server.
357    #[allow(unused_variables)]
358    pub async fn load_config(&self, strurl: &str, url: &url::Url) -> Option<Config> {
359        let config: Option<Config>;
360        let issecure = url.scheme() == "https" || url.scheme() == "wss" || url.port() == Some(443);
361        let mut port = url.port().unwrap_or(80);
362        if issecure {
363            port = 443;
364        }
365        let mut host = url.host_str().unwrap_or("localhost.openiap.io").replace("grpc.", "");
366        if host.starts_with("api-grpc") {
367            host = "api".to_string();
368        }
369        if port == 50051 {
370            port = 3000;
371        }
372        let configurl = if issecure {
373            format!(
374                "{}://{}:{}/config",
375                "https",
376                host,
377                port
378            )
379        } else {
380            format!(
381                "{}://{}:{}/config",
382                "http",
383                host,
384                port
385            )
386        };
387
388        let configurl = url::Url::parse(configurl.as_str())
389            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e))).expect("wefew");
390        trace!("Getting config from: {}", configurl);
391        let o = minreq::get(configurl).with_timeout(5).send();
392        match o {
393            Ok(_) => {
394                let response = match o {
395                    Ok(response) => response,
396                    Err(e) => {
397                        error!("Failed to get config: {}", e);
398                        return None;
399                    }
400                };
401                if response.status_code == 200 {
402                    let body = response.as_str().unwrap();
403                    config = Some(match serde_json::from_str(body) {
404                        Ok(config) => config,
405                        Err(e) => {
406                            error!("Failed to parse config: {}", e);
407                            return None;
408                        }
409                    });
410                } else {
411                    config = None;
412                }
413            }
414            Err(e) => {
415                error!("Failed to get config: {}", e);
416                return None;
417            }
418        }
419        let mut _enable_analytics = true;
420        let mut _otel_metric_url = std::env::var("OTEL_METRIC_URL").unwrap_or_default();
421        let mut _otel_trace_url = std::env::var("OTEL_TRACE_URL").unwrap_or_default();
422        let mut _otel_log_url = std::env::var("OTEL_LOG_URL").unwrap_or_default();
423        let mut apihostname = url.host_str().unwrap_or("localhost.openiap.io").to_string();
424        if apihostname.starts_with("grpc.") {
425            apihostname = apihostname[5..].to_string();
426        }
427    
428        if config.is_some() {
429            let config = config.as_ref().unwrap();
430            if !config.otel_metric_url.is_empty() {
431                _otel_metric_url = config.otel_metric_url.clone();
432            }
433            if !config.otel_trace_url.is_empty() {
434                _otel_trace_url = config.otel_trace_url.clone();
435            }
436            if !config.otel_log_url.is_empty() {
437                _otel_log_url = config.otel_log_url.clone();
438            }
439            if !config.domain.is_empty() {
440                apihostname = config.domain.clone();
441            }
442            _enable_analytics = config.enable_analytics;
443        }
444        #[cfg(feature = "otel")]
445        if _enable_analytics {
446            let service_name = self.get_service_name();
447            let agent_name = self.get_agent_name();
448            let agent_version = self.get_agent_version();
449            match otel::init_telemetry(&service_name, &agent_name, &agent_version, VERSION, &apihostname, _otel_metric_url.as_str(), 
450            _otel_trace_url.as_str(),  _otel_log_url.as_str(), 
451            &self.stats) {
452                Ok(_) => (),
453                Err(e) => {
454                    error!("Failed to initialize telemetry: {}", e);
455                    return None;
456                }
457            }
458        }
459        config
460    }
461
462    /// Connect the client to the OpenIAP server.
463    #[tracing::instrument(skip_all)]
464    pub async fn connect_async(&self, dst: &str) -> Result<(), OpenIAPError> {
465        #[cfg(test)]
466        {   
467            // enable_tracing("openiap=trace", "new");
468            // enable_tracing("openiap=debug", "new");
469            // enable_tracing("trace", "");
470            enable_tracing("openiap=error", "");
471            // enable_tracing("openiap=debug", "");
472        }
473        if self.is_connect_called() {
474            self.set_auto_reconnect(true);
475            return self.reconnect().await;
476        }
477        let mut strurl = dst.to_string();
478        if strurl.is_empty() {
479            strurl = std::env::var("apiurl").unwrap_or("".to_string());
480            if strurl.is_empty() {
481                strurl = std::env::var("grpcapiurl").unwrap_or("".to_string());
482            }
483            if strurl.is_empty() {
484                strurl = std::env::var("wsapiurl").unwrap_or("".to_string());
485            }
486        }
487        if strurl.is_empty() {
488            return Err(OpenIAPError::ClientError("No URL provided".to_string()));
489        }
490        let url = url::Url::parse(strurl.as_str())
491            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
492        let mut username = "".to_string();
493        let mut password = "".to_string();
494        if let Some(p) = url.password() {
495            password = p.to_string();
496        }
497        if !url.username().is_empty() {
498            username = url.username().to_string();
499        }
500        if !username.is_empty() && !password.is_empty() {
501            self.set_username(&username);
502            self.set_password(&password);
503        }
504        let usegprc = url.scheme() == "grpc" || url.domain().unwrap_or("localhost").to_lowercase().starts_with("grpc.") || url.port() == Some(50051);
505        if url.scheme() != "http"
506            && url.scheme() != "https"
507            && url.scheme() != "grpc"
508            && url.scheme() != "ws"
509            && url.scheme() != "wss"
510        {
511            return Err(OpenIAPError::ClientError("Invalid URL scheme".to_string()));
512        }
513        if url.scheme() == "grpc" {
514            if url.port() == Some(443) {
515                strurl = format!("https://{}", url.host_str().unwrap_or("app.openiap.io"));
516            } else { 
517                strurl = format!("http://{}:{}", url.host_str().unwrap_or("app.openiap.io"), url.port().unwrap_or(80));
518            }
519        }
520        let url = url::Url::parse(strurl.as_str())
521            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
522        if url.port().is_none() {
523            strurl = format!(
524                "{}://{}",
525                url.scheme(),
526                url.host_str().unwrap_or("app.openiap.io")
527            );
528        } else {
529            strurl = format!(
530                "{}://{}:{}",
531                url.scheme(),
532                url.host_str().unwrap_or("localhost.openiap.io"),
533                url.port().unwrap_or(80)
534            );
535        }
536        debug!("Connecting to {}", strurl);
537        let config = self.load_config(strurl.as_str(), &url).await;
538        if !usegprc {
539            strurl = format!("{}/ws/v2", strurl);
540
541            let (_stream_tx, stream_rx) = mpsc::channel(60);
542
543            let socket = match tokio_tungstenite::connect_async(strurl.clone()).await {
544                Ok((socket, _)) => socket,
545                Err(e) => {
546                    return Err(OpenIAPError::ClientError(format!(
547                        "Failed to connect to WS: {}",
548                        e
549                    )));
550                }
551            };
552            self.set_client(ClientEnum::WS(Arc::new(Mutex::new(socket))));
553            self.set_connect_called(true);
554            self.set_config(config);
555            self.set_url(&strurl);
556            match self.setup_ws(&strurl).await {
557                Ok(_) => (),
558                Err(e) => {
559                    return Err(OpenIAPError::ClientError(format!(
560                        "Failed to setup WS: {}",
561                        e
562                    )));
563                }
564            }
565            let client2 = self.clone();
566            // tokio::task::Builder::new().name("Old Websocket receiver").spawn(async move {
567            tokio::task::spawn(async move {
568                tokio_stream::wrappers::ReceiverStream::new(stream_rx)
569                    .for_each(|envelope: Envelope| async {
570                        let command = envelope.command.clone();
571                        let rid = envelope.rid.clone();
572                        let id = envelope.id.clone();
573                        trace!("Received command: {}, id: {}, rid: {}", command, id, rid);
574                        client2.parse_incomming_envelope(envelope).await;
575                    })
576                    .await;
577            }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn Old Websocket receiver task: {:?}", e)))?;
578        } else {
579            if url.scheme() == "http" {
580                let response = Client::connect_grpc(strurl.clone()).await;
581                match response {
582                    Ok(client) => {
583                        self.set_client(ClientEnum::Grpc(client));
584                    }
585                    Err(e) => {
586                        return Err(OpenIAPError::ClientError(format!(
587                            "Failed to connect: {}",
588                            e
589                        )));
590                    }
591                }
592            } else {
593                let uri = tonic::transport::Uri::builder()
594                    .scheme(url.scheme())
595                    .authority(url.host().unwrap().to_string())
596                    .path_and_query("/")
597                    .build();
598                let uri = match uri {
599                    Ok(uri) => uri,
600                    Err(e) => {
601                        return Err(OpenIAPError::ClientError(format!(
602                            "Failed to build URI: {}",
603                            e
604                        )));
605                    }
606                };
607                let channel = Channel::builder(uri)
608                    .tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
609                let channel = match channel {
610                    Ok(channel) => channel,
611                    Err(e) => {
612                        return Err(OpenIAPError::ClientError(format!(
613                            "Failed to build channel: {}",
614                            e
615                        )));
616                    }
617                };
618                let channel = channel.connect().await;
619                let channel = match channel {
620                    Ok(channel) => channel,
621                    Err(e) => {
622                        return Err(OpenIAPError::ClientError(format!(
623                            "Failed to connect: {}",
624                            e
625                        )));
626                    }
627                };
628                self.set_client(ClientEnum::Grpc(FlowServiceClient::new(channel)));
629            }
630            self.set_connect_called(true);
631            self.set_config(config);
632            self.set_url(&strurl);
633            self.setup_grpc_stream().await?;
634        };
635        self.post_connected().await
636    }
637
638    /// Connect will initializes a new client and starts a connection to an OpenIAP server.\
639    /// Use "" to autodetect the server from the environment variables (apiurl or grpcapiurl), or provide a URL.\
640    /// \
641    /// You can add username and password, to login using local provider, or set them using OPENIAP_USERNAME and OPENIAP_PASSWORD environment variables.
642    /// It is highly recommended to not user username and password, but instead use a JWT token, set using the OPENIAP_JWT (or jwt) environment variable.
643    /// You can use the openiap vs.code extension to manage this, if you need to generate one your self, login to the OpenIAP server and then open the /jwtlong page.
644    /// If credentials are not provided, the client will run as guest.\
645    /// If credentials are found, it will call [Client::signin] after successfully connecting to the server.
646    /// 
647    /// To troubleshoot issues, call [enable_tracing].
648    /// ```
649    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
650    /// #[tokio::main]
651    /// async fn main() -> Result<(), OpenIAPError> {
652    ///     let client = Client::new_connect("").await?;
653    ///     let q = client.query( QueryRequest::with_projection(
654    ///         "entities",
655    ///         "{}",
656    ///         "{\"name\":1}"
657    ///     )).await?;
658    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
659    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
660    ///     for item in items {
661    ///         println!("Item: {:?}", item);
662    ///     }
663    ///     Ok(())
664    /// }
665    /// ```
666    #[tracing::instrument(skip_all)]
667    pub async fn new_connect(dst: &str) -> Result<Self, OpenIAPError> {
668        #[cfg(test)]
669        {
670            enable_tracing("openiap=error", "");
671        }
672        let client  = Client::new();
673        client.connect_async(dst).await?;
674        Ok(client)
675    }
676    /// Handle auto-signin after a connection has been established.
677    pub async fn post_connected(&self) -> Result<(), OpenIAPError> {
678        if self.get_username().is_empty() && self.get_password().is_empty() {
679            self.set_username(&std::env::var("OPENIAP_USERNAME").unwrap_or_default());
680            self.set_password(&std::env::var("OPENIAP_PASSWORD").unwrap_or_default());
681        }
682        if !self.get_username().is_empty() && !self.get_password().is_empty() {
683            debug!("Signing in with username: {}", self.get_username());
684            let signin = SigninRequest::with_userpass(self.get_username().as_str(), self.get_password().as_str());
685            let loginresponse = self.signin(signin).await;
686            match loginresponse {
687                Ok(response) => {
688                    self.reset_reconnect_ms();
689                    self.set_connected(ClientState::Connected, None);
690                    info!("Signed in as {}", response.user.as_ref().unwrap().username);
691                    Ok(())
692                }
693                Err(_e) => {
694                    self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
695                    Err(_e)
696                }
697            }
698        } else {
699            self.set_jwt(&std::env::var("OPENIAP_JWT").unwrap_or_default());
700            if self.get_jwt().is_empty() {
701                self.set_jwt(&std::env::var("jwt").unwrap_or_default());
702            }
703            if !self.get_jwt().is_empty() {
704                debug!("Signing in with JWT");
705                let signin = SigninRequest::with_jwt(self.get_jwt().as_str());
706                let loginresponse = self.signin(signin).await;
707                match loginresponse {
708                    Ok(response) => match response.user {
709                        Some(user) => {
710                            self.reset_reconnect_ms();
711                            info!("Signed in as {}", user.username);
712                            self.set_connected(ClientState::Connected, None);
713                            Ok(())
714                        }
715                        None => {
716                            self.reset_reconnect_ms();
717                            info!("Signed in as guest");
718                            self.set_connected(ClientState::Connected, None);
719                            Ok(())
720                            // Err(OpenIAPError::ClientError("Signin returned no user object".to_string()))
721                        }
722                    },
723                    Err(_e) => {
724                        self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
725                        Err(_e)
726                    }
727                }
728            } else {
729                self.reset_reconnect_ms();
730                match self.get_element().await {
731                    Ok(_) => {
732                        debug!("Connected, No credentials provided so is running as guest");
733                        self.set_connected(ClientState::Connected, None);
734                        Ok(())
735                    },
736                    Err(e) => {
737                        self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
738                        Err(e)
739                    }
740                }
741            }
742        }
743    }
744    /// Reconnect will attempt to reconnect to the OpenIAP server.
745    #[tracing::instrument(skip_all)]
746    pub async fn reconnect(&self) -> Result<(), OpenIAPError> {
747        let state = self.get_state();
748        if state == ClientState::Connected || state == ClientState::Signedin {
749            return Ok(());
750        }
751        if !self.is_auto_reconnect() {
752            return Ok(());   
753        }
754        let client = self.get_client();
755    
756        match client {
757            ClientEnum::WS(ref _client) => {
758                info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
759                self.setup_ws(&self.get_url()).await?;
760                debug!("Completed reconnecting to websocket");
761                self.post_connected().await
762            }
763            ClientEnum::Grpc(ref _client) => {
764                info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
765                match self.setup_grpc_stream().await {
766                    Ok(_) => {
767                        debug!("Completed reconnecting to gRPC");
768                        self.post_connected().await
769                    },
770                    Err(e) => {
771                        return Err(OpenIAPError::ClientError(format!(
772                            "Failed to setup gRPC stream: {}",
773                            e
774                        )));
775                    }
776                }
777            }
778            ClientEnum::None => {
779                return Err(OpenIAPError::ClientError("Invalid client".to_string()));
780            }
781        }
782    }
783    /// Disconnect the client from the OpenIAP server.
784    pub fn disconnect(&self) {
785        self.set_auto_reconnect(false);
786        self.set_connected(ClientState::Disconnected, Some("Disconnected"));
787    }
788    /// Set the connected flag to true or false
789    pub fn set_connected(&self, state: ClientState, message: Option<&str>) {
790        {
791            let current = self.get_state();
792            trace!("Set connected: {:?} from {:?}", state, current);
793            if state == ClientState::Connected && current == ClientState::Signedin {
794                self.set_state(ClientState::Signedin);
795            } else {
796                self.set_state(state.clone());
797            }
798            if state == ClientState::Disconnected && !current.eq(&state) {
799                let me = self.clone();
800                tokio::task::spawn(async move {
801                    let mut reply_queue_guard = me.rpc_reply_queue.lock().await;
802                    let mut callback_guard = me.rpc_callback.lock().await;
803                    *reply_queue_guard = None;
804                    *callback_guard = None;
805                });
806            }
807            if state == ClientState::Connecting && !current.eq(&state) {
808                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
809                    self.stats.lock().unwrap().connection_attempts += 1;
810                    let me = self.clone();
811                    tokio::task::spawn(async move {
812                        me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
813                    });
814                    // match tokio::task::Builder::new().name("setconnected-notify-connecting").spawn(async move {
815                    //     me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
816                    // }) {
817                    //     Ok(_) => (),
818                    //     Err(e) => {
819                    //         error!("Failed to spawn setconnected-notify-connecting task: {:?}", e);
820                    //     }
821                    // }
822                }
823
824            }
825            if (state == ClientState::Connected|| state == ClientState::Signedin) && (current == ClientState::Disconnected || current == ClientState::Connecting) { 
826                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
827                    self.stats.lock().unwrap().connections += 1;
828                    let me = self.clone();
829                    tokio::task::spawn(async move {
830                        me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
831                    });
832                    // match tokio::task::Builder::new().name("setconnected-notify-connected").spawn(async move {
833                    //     me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
834                    // }) {
835                    //     Ok(_) => (),
836                    //     Err(e) => {
837                    //         error!("Failed to spawn setconnected-notify-connected task: {:?}", e);
838                    //     }
839                    // }
840                }
841            }
842            if state == ClientState::Signedin && current != ClientState::Signedin {
843                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
844                    let me = self.clone();
845                    tokio::task::spawn(async move {
846                        me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
847                    });
848                    // match tokio::task::Builder::new().name("set_signedin-notify-connected").spawn(async move {
849                    //     me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
850                    // }) {
851                    //     Ok(_) => (),
852                    //     Err(e) => {
853                    //         error!("Failed to spawn set_signedin-notify-connected task: {:?}", e);
854                    //     }
855                    // };
856                }
857            }
858            if state == ClientState::Disconnected && !current.eq(&state) {
859                if message.is_some() {
860                    debug!("Disconnected: {}", message.unwrap());
861                } else {
862                    debug!("Disconnected");
863                }
864                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
865                    let me = self.clone();
866                    let message = match message {
867                        Some(message) => message.to_string(),
868                        None => "".to_string(),
869                    };
870                    //if current != ClientState::Connecting {
871                        tokio::task::spawn(async move {
872                            me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
873                        });
874                        // match tokio::task::Builder::new().name("setconnected-notify-disconnected").spawn(async move {
875                        //     trace!("Disconnected: {}", message);
876                        //     me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
877                        // }) {
878                        //     Ok(_) => (),
879                        //     Err(e) => {
880                        //         error!("Failed to spawn setconnected-notify-disconnected task: {:?}", e);
881                        //     }
882                        // }
883                    //}
884                }
885
886                self.kill_handles();
887                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
888                    let client = self.clone();
889                    // match tokio::task::Builder::new().name("kill_handles").spawn(async move {
890                        tokio::task::spawn(async move {
891                        {
892                            let inner = client.inner.lock().await;
893                            let mut queries = inner.queries.lock().await;
894                            let ids = queries.keys().cloned().collect::<Vec<String>>();
895                            debug!("********************************************** Cleaning up");
896                            for id in ids {
897                                let err = ErrorResponse {
898                                    code: 500,
899                                    message: "Disconnected".to_string(),
900                                    stack: "".to_string(),
901                                };
902                                let envelope = err.to_envelope();
903                                let tx = queries.remove(&id).unwrap();
904                                debug!("kill query: {}", id);
905                                let _ = tx.send(envelope);
906                            }
907                            let mut streams = inner.streams.lock().await;
908                            let ids = streams.keys().cloned().collect::<Vec<String>>();
909                            for id in ids {
910                                let tx = streams.remove(&id).unwrap();
911                                debug!("kill stream: {}", id);
912                                let _ = tx.send(Vec::new()).await;
913                            }
914                            let mut queues = inner.queues.lock().await;
915                            let ids = queues.keys().cloned().collect::<Vec<String>>();
916                            for id in ids {
917                                let _ = queues.remove(&id).unwrap();
918                            }
919                            let mut watches = inner.watches.lock().await;
920                            let ids = watches.keys().cloned().collect::<Vec<String>>();
921                            for id in ids {
922                                let _ = watches.remove(&id).unwrap();
923                            }
924                            debug!("**********************************************************");
925                        }
926                        if client.is_auto_reconnect() {
927                            trace!("Reconnecting in {} seconds", client.get_reconnect_ms() / 1000);
928                            tokio::time::sleep(Duration::from_millis(client.get_reconnect_ms() as u64)).await;
929                            if client.is_auto_reconnect() {
930                                client.inc_reconnect_ms();
931                                // let mut client = client.clone();
932                                trace!("Reconnecting . . .");
933                                client.reconnect().await.unwrap_or_else(|e| {
934                                    error!("Failed to reconnect: {}", e);
935                                    client.set_connected(ClientState::Disconnected, Some(&e.to_string()));
936                                });
937                            } else {
938                                debug!("Not reconnecting");
939                            }
940                        } else {
941                            debug!("Reconnecting disabled, stop now");
942                        }
943                    });
944                    //  {
945                    //     Ok(_) => (),
946                    //     Err(e) => {
947                    //         error!("Failed to spawn kill_handles task: {:?}", e);
948                    //     }
949                    // }
950                }
951    
952            }
953        }
954    }
955    /// Get client state
956    pub fn get_state(&self) -> ClientState {
957        let conn = self.state.lock().unwrap();
958        conn.clone()
959    }
960    /// Set client state
961    pub fn set_state(&self, state: ClientState) {
962        let mut conn = self.state.lock().unwrap();
963        *conn = state;
964    }
965    /// Set the msgcount value
966    pub fn set_msgcount(&self, msgcount: i32) {
967        let mut current = self.msgcount.lock().unwrap();
968        trace!("Set msgcount: {} from {}", msgcount, *current);
969        *current = msgcount;
970    }
971    /// Increment the msgcount value
972    pub fn inc_msgcount(&self) -> i32 {
973        let mut current = self.msgcount.lock().unwrap();
974        *current += 1;
975        *current
976    }
977    /// Return value of reconnect_ms
978    pub fn get_reconnect_ms(&self) -> i32 {
979        let reconnect_ms = self.reconnect_ms.lock().unwrap();
980        *reconnect_ms
981    }
982    /// Increment the reconnect_ms value
983    pub fn reset_reconnect_ms(&self) {
984        let mut current = self.reconnect_ms.lock().unwrap();
985        *current = 500;
986    }
987    /// Increment the reconnect_ms value
988    pub fn inc_reconnect_ms(&self) -> i32 {
989        let mut current = self.reconnect_ms.lock().unwrap();
990        if *current < 30000 {
991            *current += 500;
992        }
993        *current
994    }
995    
996    /// Push tokio task handle to the task_handles vector
997    pub fn push_handle(&self, handle: tokio::task::JoinHandle<()>) {
998        let mut handles = self.task_handles.lock().unwrap();
999        handles.push(handle);
1000    }
1001    /// Kill all tokio task handles in the task_handles vector
1002    pub fn kill_handles(&self) {
1003        let mut handles = self.task_handles.lock().unwrap();
1004        for handle in handles.iter() {
1005            // let id = handle.id();
1006            // debug!("Killing handle {}", id);
1007            debug!("Killing handle");
1008            if !handle.is_finished() {
1009                handle.abort();
1010            }
1011        }
1012        handles.clear();
1013        // if let Ok(_handle) = tokio::runtime::Handle::try_current() {
1014        //     let runtime = self.get_runtime();
1015        //     if let Some(rt) = runtime.lock().unwrap().take() {
1016        //         rt.shutdown_background();
1017        //     }
1018        // }
1019    }
1020
1021
1022    /// Return value of the msgcount flag
1023    #[tracing::instrument(skip_all)]
1024    fn get_msgcount(&self) -> i32 {
1025        let msgcount = self.msgcount.lock().unwrap();
1026        *msgcount
1027    }
1028    /// Set the default timeout for the client commands
1029    pub fn set_default_timeout(&self, timeout: Duration) {
1030        let mut current = self.default_timeout.lock().unwrap();
1031        trace!("Set default_timeout: {} from {:?}", timeout.as_secs(), current.as_secs());
1032        *current = timeout;
1033    }
1034    /// Return the default timeout for the client commands
1035    pub fn get_default_timeout(&self) -> Duration {
1036        let current = self.default_timeout.lock().unwrap();
1037        current.clone()
1038    }
1039    /// Set the connect_called flag to true or false
1040    #[tracing::instrument(skip_all)]
1041    pub fn set_connect_called(&self, connect_called: bool) {
1042        let mut current = self.connect_called.lock().unwrap();
1043        trace!("Set connect_called: {} from {}", connect_called, *current);
1044        *current = connect_called;
1045    }
1046    /// Return value of the connect_called flag
1047    #[tracing::instrument(skip_all)]
1048    fn is_connect_called(&self) -> bool {
1049        let connect_called = self.connect_called.lock().unwrap();
1050        *connect_called
1051    }
1052    /// Set the auto_reconnect flag to true or false
1053    #[tracing::instrument(skip_all)]
1054    pub fn set_auto_reconnect(&self, auto_reconnect: bool) {
1055        let mut current = self.auto_reconnect.lock().unwrap();
1056        trace!("Set auto_reconnect: {} from {}", auto_reconnect, *current);
1057        *current = auto_reconnect;
1058    }
1059    /// Return value of the auto_reconnect flag
1060    #[tracing::instrument(skip_all)]
1061    fn is_auto_reconnect(&self) -> bool {
1062        let auto_reconnect = self.auto_reconnect.lock().unwrap();
1063        *auto_reconnect
1064    }
1065    /// Set the url flag to true or false
1066    #[tracing::instrument(skip_all)]
1067    pub fn set_url(&self, url: &str) {
1068        let mut current = self.url.lock().unwrap();
1069        trace!("Set url: {} from {}", url, *current);
1070        *current = url.to_string();
1071    }
1072    /// Return value of the url string
1073    #[tracing::instrument(skip_all)]
1074    fn get_url(&self) -> String {
1075        let url = self.url.lock().unwrap();
1076        url.to_string()
1077    }
1078    /// Set the username flag to true or false
1079    #[tracing::instrument(skip_all)]
1080    pub fn set_username(&self, username: &str) {
1081        let mut current = self.username.lock().unwrap();
1082        trace!("Set username: {} from {}", username, *current);
1083        *current = username.to_string();
1084    }
1085    /// Return value of the username string
1086    #[tracing::instrument(skip_all)]
1087    fn get_username(&self) -> String {
1088        let username = self.username.lock().unwrap();
1089        username.to_string()
1090    }
1091    /// Set the password value
1092    #[tracing::instrument(skip_all)]
1093    pub fn set_password(&self, password: &str) {
1094        let mut current = self.password.lock().unwrap();
1095        trace!("Set password: {} from {}", password, *current);
1096        *current = password.to_string();
1097    }
1098    /// Return value of the password string
1099    #[tracing::instrument(skip_all)]
1100    fn get_password(&self) -> String {
1101        let password = self.password.lock().unwrap();
1102        password.to_string()
1103    }
1104    /// Set the jwt flag to true or false
1105    #[tracing::instrument(skip_all)]
1106    pub fn set_jwt(&self, jwt: &str) {
1107        let mut current = self.jwt.lock().unwrap();
1108        trace!("Set jwt: {} from {}", jwt, *current);
1109        *current = jwt.to_string();
1110    }
1111    /// Return value of the jwt string
1112    #[tracing::instrument(skip_all)]
1113    fn get_jwt(&self) -> String {
1114        let jwt = self.jwt.lock().unwrap();
1115        jwt.to_string()
1116    }
1117    
1118    /// Set the service name
1119    #[tracing::instrument(skip_all)]
1120    pub fn set_service_name(&self, service_name: &str) {
1121        let mut current = self.service_name.lock().unwrap();
1122        trace!("Set servicename: {} from {}", service_name, *current);
1123        *current = service_name.to_string();
1124    }
1125    /// Return value of the service name string
1126    #[tracing::instrument(skip_all)]
1127    pub fn get_service_name(&self) -> String {
1128        let servicename = self.service_name.lock().unwrap();
1129        servicename.to_string()
1130    }
1131    /// Set the agent name
1132    #[tracing::instrument(skip_all)]
1133    pub fn set_agent_name(&self, agent: &str) {
1134        let mut current = self.agent_name.lock().unwrap();
1135        trace!("Set agent: {} from {}", agent, *current);
1136        *current = agent.to_string();
1137    }
1138    /// Return value of the agent string
1139    #[tracing::instrument(skip_all)]
1140    pub fn get_agent_name(&self) -> String {
1141        let agent = self.agent_name.lock().unwrap();
1142        agent.to_string()
1143    }
1144    /// Set the agent version number
1145    #[tracing::instrument(skip_all)]
1146    pub fn set_agent_version(&self, version: &str) {
1147        let mut current = self.agent_version.lock().unwrap();
1148        trace!("Set agent version: {} from {}", version, *current);
1149        *current = version.to_string();
1150    }
1151    /// Return value of the agent version string
1152    #[tracing::instrument(skip_all)]
1153    pub fn get_agent_version(&self) -> String {
1154        let agent_version = self.agent_version.lock().unwrap();
1155        agent_version.to_string()
1156    }
1157    
1158    /// Set the config flag to true or false
1159    #[tracing::instrument(skip_all)]
1160    pub fn set_config(&self, config: Option<Config>) {
1161        let mut current = self.config.lock().unwrap();
1162        *current = config;
1163    }
1164    /// Return value of the config 
1165    #[tracing::instrument(skip_all)]
1166    pub fn get_config(&self) -> Option<Config> {
1167        let config = self.config.lock().unwrap();
1168        config.clone()
1169    }
1170    /// Set the client flag to true or false
1171    #[tracing::instrument(skip_all)]
1172    pub fn set_client(&self, client: ClientEnum) {
1173        let mut current = self.client.lock().unwrap();
1174        *current = client;
1175    }
1176    /// Return value of the client
1177    #[tracing::instrument(skip_all)]
1178    fn get_client(&self) -> ClientEnum {
1179        let client = self.client.lock().unwrap();
1180        client.clone()
1181    }
1182    /// Set the user flag to true or false
1183    #[tracing::instrument(skip_all)]
1184    pub fn set_user(&self, user: Option<User>) {
1185        let mut current = self.user.lock().unwrap();
1186        *current = user;
1187    }
1188    /// Return value of the user
1189    #[tracing::instrument(skip_all)]
1190    pub fn get_user(&self) -> Option<User> {
1191        let user = self.user.lock().unwrap();
1192        user.clone()
1193    }
1194    // /// Return the signed in user, if we are signed in.
1195    // #[tracing::instrument(skip_all)]
1196    // pub fn get_user(&self) -> Option<User> {
1197    //     // let inner = self.inner.lock().await;
1198    //     // inner.user.clone()
1199    //     self.user.clone()
1200    // }
1201    
1202    /// Set the runtime flag to true or false
1203    #[tracing::instrument(skip_all)]
1204    pub fn set_runtime(&self, runtime: Option<tokio::runtime::Runtime>) {
1205        let mut current = self.runtime.lock().unwrap();
1206        *current = runtime;
1207    }
1208    /// Return value of the runtime
1209    #[tracing::instrument(skip_all)]
1210    // pub fn get_runtime(&self) -> Option<Arc<tokio::runtime::Runtime>> {
1211    pub fn get_runtime(&self) -> &std::sync::Mutex<std::option::Option<tokio::runtime::Runtime>> {
1212        self.runtime.as_ref()
1213    }
1214    /// Return value of the runtime handle
1215    #[tracing::instrument(skip_all)]
1216    pub fn get_runtime_handle(&self) -> tokio::runtime::Handle {
1217        let mut rt = self.runtime.lock().unwrap();
1218        if rt.is_none() {
1219            // println!("Rust: Initializing new Tokio runtime");
1220            let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
1221            *rt = Some(runtime);
1222        } else {
1223            // println!("Rust: Runtime already initialized");
1224        }
1225        rt.as_ref().unwrap().handle().clone()
1226    }
1227    /// Method to allow the user to subscribe with a callback function
1228    #[tracing::instrument(skip_all)]
1229    pub async fn on_event(&self, callback: Box<dyn Fn(ClientEvent) + Send + Sync>)
1230    {
1231        // call the callback function every time there is an event in the client.event_receiver
1232        let event_receiver = self.event_receiver.clone();
1233        let callback = callback;
1234        let _handle =  tokio::task::spawn(async move {
1235            while let Ok(event) = event_receiver.recv().await {
1236                callback(event);
1237            }
1238        }); // .unwrap();
1239    }
1240    /// Internal function, used to generate a unique id for each message sent to the server.
1241    #[tracing::instrument(skip_all)]
1242    pub fn get_uniqueid() -> String {
1243        static COUNTER: AtomicUsize = AtomicUsize::new(1);
1244        let num1 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1245        let num2 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1246        let num3 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1247        let sqids = Sqids::default();
1248        sqids.encode(&[num1, num2, num3 ]).unwrap().to_string()
1249    }
1250    /// Internal function, Send a message to the OpenIAP server, and wait for a response.
1251    #[tracing::instrument(skip_all)]
1252    async fn send(&self, msg: Envelope, timeout: Option<tokio::time::Duration>) -> Result<Envelope, OpenIAPError> {
1253        let response = self.send_noawait(msg).await;
1254        match response {
1255            Ok((response_rx, id)) => {
1256                let timeout = match timeout {
1257                    Some(t) => t,
1258                    None => self.get_default_timeout()
1259                };
1260                let result = tokio::time::timeout(timeout, response_rx).await;
1261                // Remove the entry from `inner.queries` after awaiting
1262                let inner = self.inner.lock().await;
1263                inner.queries.lock().await.remove(&id);
1264
1265                match result {
1266                    Ok(Ok(response)) => Ok(response),
1267                    Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
1268                    Err(_) => Err(OpenIAPError::ClientError("Request timed out".to_string())),
1269                }
1270                // // Await the response
1271                // let response = response_rx.await;
1272    
1273                // // Remove the entry from `inner.queries` after awaiting
1274                // let inner = self.inner.lock().await;
1275                // inner.queries.lock().await.remove(&id);
1276    
1277                // // Handle the result of the await
1278                // match response {
1279                //     Ok(response) => Ok(response),
1280                //     Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1281                // }
1282            }
1283            Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1284        }
1285    }
1286    /// Internal function, Send a message to the OpenIAP server, and do not wait for a response.
1287    /// used when sending a stream of data, or when we do not need a response.
1288    #[tracing::instrument(skip_all)]
1289    async fn send_noawait(
1290        &self,
1291        mut msg: Envelope,
1292    ) -> Result<(oneshot::Receiver<Envelope>, String), OpenIAPError> {
1293        let (response_tx, response_rx) = oneshot::channel();
1294        let id = Client::get_uniqueid();
1295        msg.id = id.clone();
1296    
1297        // Lock and insert the sender into `inner.queries`
1298        {
1299            let inner = self.inner.lock().await;
1300            inner.queries.lock().await.insert(id.clone(), response_tx);
1301        }
1302    
1303        // Send the message and check for errors
1304        let res = self.send_envelope(msg).await;
1305        if let Err(e) = res {
1306            // Remove the entry from `inner.queries` if the send fails
1307            let inner = self.inner.lock().await;
1308            inner.queries.lock().await.remove(&id);
1309            return Err(OpenIAPError::ClientError(e.to_string()));
1310        }
1311    
1312        Ok((response_rx, id))
1313    }
1314    /// Internal function, Setup a new stream, send a message to the OpenIAP server, and return a stream to send and receive data.
1315    #[tracing::instrument(skip_all)]
1316    async fn sendwithstream(
1317        &self,
1318        mut msg: Envelope,
1319    ) -> Result<(oneshot::Receiver<Envelope>, mpsc::Receiver<Vec<u8>>), OpenIAPError> {
1320        let (response_tx, response_rx) = oneshot::channel();
1321        let (stream_tx, stream_rx) = mpsc::channel(1024 * 1024);
1322        let id = Client::get_uniqueid();
1323        msg.id = id.clone();
1324        {
1325            let inner = self.inner.lock().await;
1326            inner.queries.lock().await.insert(id.clone(), response_tx);
1327            inner.streams.lock().await.insert(id.clone(), stream_tx);
1328            let res = self.send_envelope(msg).await;
1329            match res {
1330                Ok(_) => (),
1331                Err(e) => return Err(OpenIAPError::ClientError(e.to_string())),
1332            }
1333        }
1334        Ok((response_rx, stream_rx))
1335    }
1336    #[tracing::instrument(skip_all, target = "openiap::client")]
1337    async fn send_envelope(&self, mut envelope: Envelope) -> Result<(), OpenIAPError> {
1338        if (self.get_state() != ClientState::Connected && self.get_state() != ClientState::Signedin ) 
1339            && envelope.command != "signin" && envelope.command != "getelement" && envelope.command != "pong" {
1340            return Err(OpenIAPError::ClientError(format!("Not connected ( {:?} )", self.get_state())));
1341        }
1342        let env = envelope.clone();
1343        let command = envelope.command.clone();
1344        self.stats.lock().unwrap().package_tx += 1;
1345        match command.as_str() {
1346            "signin" => { self.stats.lock().unwrap().signin += 1;},
1347            "upload" => { self.stats.lock().unwrap().upload += 1;},
1348            "download" => { self.stats.lock().unwrap().download += 1;},
1349            "getdocumentversion" => { self.stats.lock().unwrap().getdocumentversion += 1;},
1350            "customcommand" => { self.stats.lock().unwrap().customcommand += 1;},
1351            "listcollections" => { self.stats.lock().unwrap().listcollections += 1;},
1352            "createcollection" => { self.stats.lock().unwrap().createcollection += 1;},
1353            "dropcollection" => { self.stats.lock().unwrap().dropcollection += 1;},
1354            "ensurecustomer" => { self.stats.lock().unwrap().ensurecustomer += 1;},
1355            "invokeopenrpa" => { self.stats.lock().unwrap().invokeopenrpa += 1;},
1356
1357            "registerqueue" => { self.stats.lock().unwrap().registerqueue += 1;},
1358            "registerexchange" => { self.stats.lock().unwrap().registerexchange += 1;},
1359            "unregisterqueue" => { self.stats.lock().unwrap().unregisterqueue += 1;},
1360            "watch" => { self.stats.lock().unwrap().watch += 1;},
1361            "unwatch" => { self.stats.lock().unwrap().unwatch += 1;},
1362            "queuemessage" => { self.stats.lock().unwrap().queuemessage += 1;},
1363
1364            "pushworkitem" => { self.stats.lock().unwrap().pushworkitem += 1;},
1365            "pushworkitems" => { self.stats.lock().unwrap().pushworkitems += 1;},
1366            "popworkitem" => { self.stats.lock().unwrap().popworkitem += 1;},
1367            "updateworkitem" => { self.stats.lock().unwrap().updateworkitem += 1;},
1368            "deleteworkitem" => { self.stats.lock().unwrap().deleteworkitem += 1;},
1369            "addworkitemqueue" => { self.stats.lock().unwrap().addworkitemqueue += 1;},
1370            "updateworkitemqueue" => { self.stats.lock().unwrap().updateworkitemqueue += 1;},
1371            "deleteworkitemqueue" => { self.stats.lock().unwrap().deleteworkitemqueue += 1;},
1372
1373            "getindexes" => { self.stats.lock().unwrap().getindexes += 1;},
1374            "createindex" => { self.stats.lock().unwrap().createindex += 1;},
1375            "dropindex" => { self.stats.lock().unwrap().dropindex += 1;},
1376            "query" => { self.stats.lock().unwrap().query += 1;},
1377            "count" => { self.stats.lock().unwrap().count += 1;},
1378            "distinct" => { self.stats.lock().unwrap().distinct += 1;},
1379            "aggregate" => { self.stats.lock().unwrap().aggregate += 1;},
1380            "insertone" => { self.stats.lock().unwrap().insertone += 1;},
1381            "insertmany" => { self.stats.lock().unwrap().insertmany += 1;},
1382            "updateone" => { self.stats.lock().unwrap().updateone += 1;},
1383            "insertorupdateone" => { self.stats.lock().unwrap().insertorupdateone += 1;},
1384            "insertorupdatemany" => { self.stats.lock().unwrap().insertorupdatemany += 1;},
1385            "updatedocument" => { self.stats.lock().unwrap().updatedocument += 1;},
1386            "deleteone" => { self.stats.lock().unwrap().deleteone += 1;},
1387            "deletemany" => { self.stats.lock().unwrap().deletemany += 1;},
1388            _ => {}
1389        };
1390        if envelope.id.is_empty() {
1391            let id = Client::get_uniqueid();
1392            envelope.id = id.clone();
1393        }
1394        trace!("Sending {} message, in the thread", command);
1395        let res = self.out_envelope_sender.send(env).await;
1396        if res.is_err() {
1397            error!("{:?}", res);
1398            let errmsg = res.unwrap_err().to_string();
1399            self.set_connected(ClientState::Disconnected, Some(&errmsg));
1400            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", errmsg)))
1401        } else {
1402            return Ok(())
1403        }
1404    }
1405    #[tracing::instrument(skip_all, target = "openiap::client")]
1406    async fn parse_incomming_envelope(&self, received: Envelope) {
1407        self.stats.lock().unwrap().package_rx += 1;
1408        let command = received.command.clone();
1409        trace!("parse_incomming_envelope, command: {}", command);
1410        let inner = self.inner.lock().await;
1411        let rid = received.rid.clone();
1412        let mut queries = inner.queries.lock().await;
1413        let mut streams = inner.streams.lock().await;
1414        let watches = inner.watches.lock().await;
1415        let queues = inner.queues.lock().await;
1416    
1417        if command != "ping" && command != "pong" && command != "refreshtoken" {
1418            if rid.is_empty() {
1419                debug!("Received #{} #{} {} message", received.seq, received.id, command);
1420            } else {
1421                debug!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1422            }
1423        } else if rid.is_empty() {
1424            trace!("Received #{} #{} {} message", received.seq, received.id, command);
1425        } else {
1426            trace!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1427        }
1428        
1429        if command == "ping" {
1430            self.pong(&received.id).await;
1431            // self.event_sender.send(crate::ClientEvent::Ping).await.unwrap();
1432        } else if command == "refreshtoken" {
1433            // TODO: Do we store the new jwt at some point in the future
1434        } else if command == "beginstream"
1435            || command == "stream"
1436            || command == "endstream"
1437        {
1438            let streamresponse: Stream =
1439                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1440            let streamdata = streamresponse.data;
1441            if !streamdata.is_empty() {
1442                let stream = streams.get(rid.as_str()).unwrap();
1443    
1444                match stream.send(streamdata).await {
1445                    Ok(_) => _ = (),
1446                    Err(e) => error!("Failed to send data: {}", e),
1447                }
1448            }
1449            if command == "endstream" {
1450                let _ = streams.remove(rid.as_str());
1451            }
1452        } else if command == "watchevent" {
1453            let watchevent: WatchEvent =
1454                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1455            if let Some(callback) = watches.get(watchevent.id.as_str()) {
1456                callback(watchevent);
1457            }
1458        } else if command == "queueevent" {
1459            let queueevent: QueueEvent =
1460                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1461            if let Some(callback) = queues.get(queueevent.queuename.as_str()).cloned() {
1462                let queuename = queueevent.replyto.clone();
1463                let correlation_id = queueevent.correlation_id.clone();
1464                let me = self.clone();
1465                tokio::spawn(async move {
1466                    let result_fut = callback(Arc::new(me.clone()), queueevent);
1467                    let result = result_fut.await;
1468                    if result.is_some() && !queuename.is_empty() {
1469                        debug!("Sending return value from queue event callback to {}", queuename);
1470                        let result = result.unwrap();
1471                        let q = QueueMessageRequest {
1472                            queuename,
1473                            correlation_id,
1474                            data: result,
1475                            striptoken: true,
1476                            ..Default::default()
1477                        };
1478                        let e = q.to_envelope();
1479                        let send_result = me.send(e, None).await;
1480                        if let Err(e) = send_result {
1481                            error!("Failed to send queue event response: {}", e);
1482                        }
1483                    }
1484                });
1485            }
1486        } else if let Some(response_tx) = queries.remove(&rid) {
1487            let stream = streams.get(rid.as_str());
1488            if let Some(stream) = stream {
1489                let streamdata = vec![];
1490                match stream.send(streamdata).await {
1491                    Ok(_) => _ = (),
1492                    Err(e) => error!("Failed to send data: {}", e),
1493                }
1494            }
1495            let _ = response_tx.send(received);
1496        } else {
1497            error!("Received unhandled {} message: {:?}", command, received);
1498        }    
1499    }    
1500    /// Internal function, used to send a fake getelement to the OpenIAP server.
1501    #[tracing::instrument(skip_all)]
1502    async fn get_element(&self) -> Result<(), OpenIAPError> {
1503        let id = Client::get_uniqueid();
1504        let envelope = Envelope {
1505            id: id.clone(),
1506            command: "getelement".into(),
1507            ..Default::default()
1508        };
1509        let result = match self.send(envelope, None).await {
1510            Ok(res) => res,
1511            Err(e) => {
1512                return Err(e);
1513            },
1514        };
1515        if result.command == "pong" || result.command == "getelement" {
1516            Ok(())
1517        } else if result.command == "error" {
1518            let e: ErrorResponse = prost::Message::decode(result.data.unwrap().value.as_ref()).unwrap();
1519            Err(OpenIAPError::ServerError(e.message))
1520        } else {
1521            Err(OpenIAPError::ClientError("Failed to receive getelement".to_string()))
1522        }
1523    }
1524    /// Internal function, used to send a ping to the OpenIAP server.
1525    #[tracing::instrument(skip_all)]
1526    async fn ping(&self) -> Result<(), OpenIAPError> {
1527        let id = Client::get_uniqueid();
1528        let envelope = Envelope {
1529            id: id.clone(),
1530            command: "getelement".into(),
1531            ..Default::default()
1532        };
1533        match self.send_envelope(envelope).await {
1534            Ok(_res) => Ok(()),
1535            Err(e) => {
1536                return Err(e);
1537            },
1538        }
1539    }
1540    /// Internal function, used to send a pong response to the OpenIAP server.
1541    #[tracing::instrument(skip_all)]
1542    async fn pong(&self, rid: &str) {
1543        let id = Client::get_uniqueid();
1544        let envelope = Envelope {
1545            id: id.clone(),
1546            command: "pong".into(),
1547            rid: rid.to_string(),
1548            ..Default::default()
1549        };
1550        match self.send_envelope(envelope).await {
1551            Ok(_) => (),
1552            Err(e) => error!("Failed to send pong: {}", e),
1553        }
1554    }
1555    /// Sign in to the OpenIAP service. \
1556    /// If no username and password is provided, it will attempt to use environment variables.\
1557    /// if config is set to validateonly, it will only validate the credentials, but not sign in.\
1558    /// If no jwt, username and password is provided, it will attempt to use environment variables.\
1559    /// will prefere OPENIAP_JWT (or jwt) over OPENIAP_USERNAME and OPENIAP_PASSWORD.
1560    #[tracing::instrument(skip_all)]
1561    pub async fn signin(&self, mut config: SigninRequest) -> Result<SigninResponse, OpenIAPError> {
1562        // autodetect how to signin using environment variables
1563        if config.username.is_empty() && config.password.is_empty() && config.jwt.is_empty() {
1564            if config.jwt.is_empty() {
1565                config.jwt = std::env::var("OPENIAP_JWT").unwrap_or_default();
1566            }
1567            if config.jwt.is_empty() {
1568                config.jwt = std::env::var("jwt").unwrap_or_default();
1569            }
1570            // if no jwt was found, test for username and password
1571            if config.jwt.is_empty() {
1572                if config.username.is_empty() {
1573                    config.username = std::env::var("OPENIAP_USERNAME").unwrap_or_default();
1574                }
1575                if config.password.is_empty() {
1576                    config.password = std::env::var("OPENIAP_PASSWORD").unwrap_or_default();
1577                }
1578            }
1579        }
1580        let version = env!("CARGO_PKG_VERSION");
1581        if !version.is_empty() && config.version.is_empty() {
1582            config.version = version.to_string();
1583        }
1584        if config.agent.is_empty() {
1585            config.agent = self.get_agent_name();
1586        }
1587
1588        // trace!("Attempting sign-in using {:?}", config);
1589        let envelope = config.to_envelope();
1590        let result = self.send(envelope, None).await;
1591
1592        match &result {
1593            Ok(m) => {
1594                debug!("Sign-in reply received");
1595                if m.command == "error" {
1596                    let e: ErrorResponse =
1597                        prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1598                            .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1599                    return Err(OpenIAPError::ServerError(e.message));
1600                }
1601                debug!("Sign-in successful");
1602                let response: SigninResponse =
1603                    prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1604                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1605                if !config.validateonly {
1606                    self.set_connected(ClientState::Signedin, None);
1607                    self.set_user(Some(response.user.as_ref().unwrap().clone()));
1608                }
1609                Ok(response)
1610            }
1611            Err(e) => {
1612                debug!("Sending Sign-in request failed {:?}", result);
1613                debug!("Sign-in failed: {}", e.to_string());
1614                if !config.validateonly {
1615                    self.set_user(None);
1616                }
1617                Err(OpenIAPError::ClientError(e.to_string()))
1618            }
1619        }
1620    }
1621    /// Return a list of collections in the database
1622    /// - includehist: include historical collections, default is false.
1623    /// please see create_collection for examples on how to create collections.
1624    #[tracing::instrument(skip_all)]
1625    pub async fn list_collections(&self, includehist: bool) -> Result<String, OpenIAPError> {
1626        let config = ListCollectionsRequest::new(includehist);
1627        let envelope = config.to_envelope();
1628        let result = self.send(envelope, None).await;
1629        match result {
1630            Ok(m) => {
1631                let data = match m.data {
1632                    Some(data) => data,
1633                    None => {
1634                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1635                    }
1636                };
1637                if m.command == "error" {
1638                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1639                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1640                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1641                }
1642                let response: ListCollectionsResponse = prost::Message::decode(data.value.as_ref())
1643                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1644                Ok(response.results)
1645            }
1646            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1647        }
1648    }
1649    /// Create a new collection in the database.
1650    /// You can create a collection by simply adding a new document to it using [Client::insert_one].
1651    /// Or you can create a collecting using the following example:
1652    /// ```
1653    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1654    /// #[tokio::main]
1655    /// async fn main() -> Result<(), OpenIAPError> {
1656    ///     let client = Client::new_connect("").await?;
1657    ///     //let collections = client.list_collections(false).await?;
1658    ///     //println!("Collections: {}", collections);
1659    ///     let config = CreateCollectionRequest::byname("rusttestcollection");
1660    ///     client.create_collection(config).await?;
1661    ///     let config = DropCollectionRequest::byname("rusttestcollection");
1662    ///     client.drop_collection(config).await?;
1663    ///     Ok(())
1664    /// }
1665    /// ```
1666    /// You can create a normal collection with a TTL index on the _created field, using the following example:
1667    /// ```
1668    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1669    /// #[tokio::main]
1670    /// async fn main() -> Result<(), OpenIAPError> {
1671    ///     let client = Client::new_connect("").await?;
1672    ///     let config = CreateCollectionRequest::with_ttl(
1673    ///         "rusttestttlcollection",
1674    ///         60
1675    ///     );
1676    ///     client.create_collection(config).await?;
1677    ///     let config = DropCollectionRequest::byname("rusttestttlcollection");
1678    ///     client.drop_collection(config).await?;
1679    ///     Ok(())
1680    /// }
1681    /// ```
1682    /// You can create a time series collection using the following example:
1683    /// granularity can be one of: seconds, minutes, hours
1684    /// ```
1685    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1686    /// #[tokio::main]
1687    /// async fn main() -> Result<(), OpenIAPError> {
1688    ///     let client = Client::new_connect("").await?;
1689    ///     let config = CreateCollectionRequest::timeseries(
1690    ///         "rusttesttscollection2",
1691    ///         "_created",
1692    ///         "minutes"
1693    ///     );
1694    ///     client.create_collection(config).await?;
1695    ///     let config = DropCollectionRequest::byname("rusttesttscollection2");
1696    ///     client.drop_collection(config).await?;
1697    ///     Ok(())
1698    /// }
1699    /// ```
1700    #[tracing::instrument(skip_all)]
1701    pub async fn create_collection(
1702        &self,
1703        config: CreateCollectionRequest,
1704    ) -> Result<(), OpenIAPError> {
1705        if config.collectionname.is_empty() {
1706            return Err(OpenIAPError::ClientError(
1707                "No collection name provided".to_string(),
1708            ));
1709        }
1710        let envelope = config.to_envelope();
1711        let result = self.send(envelope, None).await;
1712        match result {
1713            Ok(m) => {
1714                let data = match m.data {
1715                    Some(data) => data,
1716                    None => {
1717                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1718                    }
1719                };
1720                if m.command == "error" {
1721                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1722                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1723                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1724                }
1725                Ok(())
1726            }
1727            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1728        }
1729    }
1730    /// Drop a collection from the database, this will delete all data and indexes for the collection.
1731    /// See [Client::create_collection] for examples on how to create a collection.
1732    #[tracing::instrument(skip_all)]
1733    pub async fn drop_collection(&self, config: DropCollectionRequest) -> Result<(), OpenIAPError> {
1734        if config.collectionname.is_empty() {
1735            return Err(OpenIAPError::ClientError(
1736                "No collection name provided".to_string(),
1737            ));
1738        }
1739        let envelope = config.to_envelope();
1740        let result = self.send(envelope, None).await;
1741        match result {
1742            Ok(m) => {
1743                let data = match m.data {
1744                    Some(data) => data,
1745                    None => {
1746                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1747                    }
1748                };
1749                if m.command == "error" {
1750                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1751                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1752                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1753                }
1754                Ok(())
1755            }
1756            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1757        }
1758    }
1759    /// Return all indexes for a collection in the database
1760    /// ```
1761    /// use openiap_client::{Client, GetIndexesRequest, OpenIAPError};
1762    /// #[tokio::main]
1763    /// async fn main() -> Result<(), OpenIAPError> {
1764    ///     let client = Client::new_connect("").await?;
1765    ///     let config = GetIndexesRequest::bycollectionname("rustindextestcollection");
1766    ///     let indexes = client.get_indexes(config).await?;
1767    ///     println!("Indexes: {}", indexes);
1768    ///     Ok(())
1769    /// }
1770    /// ```
1771    /// 
1772    pub async fn get_indexes(&self, config: GetIndexesRequest) -> Result<String, OpenIAPError> {
1773        if config.collectionname.is_empty() {
1774            return Err(OpenIAPError::ClientError(
1775                "No collection name provided".to_string(),
1776            ));
1777        }
1778        let envelope = config.to_envelope();
1779        let result = self.send(envelope, None).await;
1780        match result {
1781            Ok(m) => {
1782                let data = match m.data {
1783                    Some(data) => data,
1784                    None => {
1785                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1786                    }
1787                };
1788                if m.command == "error" {
1789                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1790                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1791                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1792                }
1793                let response: GetIndexesResponse = prost::Message::decode(data.value.as_ref())
1794                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1795                Ok(response.results)
1796            }
1797            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1798        }
1799    }
1800    /// Create an index in the database.
1801    /// Example of creating an index on the name field in the rustindextestcollection collection, and then dropping it again:
1802    /// ```
1803    /// use openiap_client::{Client, DropIndexRequest, CreateIndexRequest, OpenIAPError};
1804    /// #[tokio::main]
1805    /// async fn main() -> Result<(), OpenIAPError> {
1806    ///     let client = Client::new_connect("").await?;
1807    ///     let config = CreateIndexRequest::bycollectionname(
1808    ///         "rustindextestcollection",
1809    ///         "{\"name\": 1}"
1810    ///     );
1811    ///     client.create_index(config).await?;
1812    ///     let config = DropIndexRequest::bycollectionname(
1813    ///         "rustindextestcollection",
1814    ///         "name_1"
1815    ///     );
1816    ///     client.drop_index(config).await?;
1817    ///     Ok(())
1818    /// }
1819    /// ```
1820    /// Example of creating an unique index on the address field in the rustindextestcollection collection, and then dropping it again:
1821    /// ```
1822    /// use openiap_client::{Client, DropIndexRequest, CreateIndexRequest, OpenIAPError};
1823    /// #[tokio::main]
1824    /// async fn main() -> Result<(), OpenIAPError> {
1825    ///     let client = Client::new_connect("").await?;
1826    ///     let mut config = CreateIndexRequest::bycollectionname(
1827    ///         "rustindextestcollection",
1828    ///         "{\"address\": 1}"
1829    ///     );
1830    ///     config.options = "{\"unique\": true}".to_string();
1831    ///     client.create_index(config).await?;
1832    ///     let config = DropIndexRequest::bycollectionname(
1833    ///         "rustindextestcollection",
1834    ///         "address_1"
1835    ///     );
1836    ///     client.drop_index(config).await?;
1837    ///     Ok(())
1838    /// }
1839    /// ```
1840    pub async fn create_index(&self, config: CreateIndexRequest) -> Result<(), OpenIAPError> {
1841        if config.collectionname.is_empty() {
1842            return Err(OpenIAPError::ClientError(
1843                "No collection name provided".to_string(),
1844            ));
1845        }
1846        if config.index.is_empty() {
1847            return Err(OpenIAPError::ClientError(
1848                "No index was provided".to_string(),
1849            ));
1850        }
1851        let envelope = config.to_envelope();
1852        let result = self.send(envelope, None).await;
1853        match result {
1854            Ok(m) => {
1855                let data = match m.data {
1856                    Some(data) => data,
1857                    None => {
1858                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1859                    }
1860                };
1861                if m.command == "error" {
1862                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1863                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1864                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1865                }
1866                Ok(())
1867            }
1868            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1869        }
1870    }
1871    /// Drop an index from the database
1872    /// See [Client::create_index] for an example on how to create and drop an index.
1873    pub async fn drop_index(&self, config: DropIndexRequest) -> Result<(), OpenIAPError> {
1874        if config.collectionname.is_empty() {
1875            return Err(OpenIAPError::ClientError(
1876                "No collection name provided".to_string(),
1877            ));
1878        }
1879        if config.name.is_empty() {
1880            return Err(OpenIAPError::ClientError(
1881                "No index name provided".to_string(),
1882            ));
1883        }
1884        let envelope = config.to_envelope();
1885        let result = self.send(envelope, None).await;
1886        match result {
1887            Ok(m) => {
1888                let data = match m.data {
1889                    Some(data) => data,
1890                    None => {
1891                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1892                    }
1893                };
1894                if m.command == "error" {
1895                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1896                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1897                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1898                }
1899                Ok(())
1900            }
1901            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1902        }
1903    }
1904    /// To query all documents in the entities collection where _type is test, you can use the following example:
1905    /// ```
1906    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1907    /// #[tokio::main]
1908    /// async fn main() -> Result<(), OpenIAPError> {
1909    ///     let client = Client::new_connect("").await?;
1910    ///     let q = client.query( QueryRequest::with_query(
1911    ///         "entities",
1912    ///         "{\"_type\":\"test\"}"
1913    ///     )).await?;
1914    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
1915    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1916    ///     for item in items {
1917    ///         println!("Item: {:?}", item);
1918    ///     }
1919    ///     Ok(())
1920    /// }
1921    /// ```
1922    /// To query all documents in the entities collection, and only return the name and _id field for all documents, you can use the following example:
1923    /// ```
1924    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1925    /// #[tokio::main]
1926    /// async fn main() -> Result<(), OpenIAPError> {
1927    ///     let client = Client::new_connect("").await?;
1928    ///     let q = client.query( QueryRequest::with_projection(
1929    ///         "entities",
1930    ///         "{}",
1931    ///         "{\"name\":1}"
1932    ///     )).await?;
1933    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
1934    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1935    ///     for item in items {
1936    ///         println!("Item: {:?}", item);
1937    ///     }
1938    ///     Ok(())
1939    /// }
1940    /// ```
1941    #[tracing::instrument(skip_all)]
1942    pub async fn query(&self, mut config: QueryRequest) -> Result<QueryResponse, OpenIAPError> {
1943        if config.collectionname.is_empty() {
1944            config.collectionname = "entities".to_string();
1945        }
1946
1947        let envelope = config.to_envelope();
1948        let result = self.send(envelope, None).await;
1949        match result {
1950            Ok(m) => {
1951                let data = match m.data {
1952                    Some(data) => data,
1953                    None => {
1954                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1955                    }
1956                };
1957                if m.command == "error" {
1958                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1959                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1960                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1961                }
1962                let response: QueryResponse = prost::Message::decode(data.value.as_ref())
1963                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1964                debug!("Return Ok(response)");
1965                Ok(response)
1966            }
1967            Err(e) => {
1968                debug!("Error !!");
1969                Err(OpenIAPError::ClientError(e.to_string()))
1970            }
1971        }
1972    }
1973    /// Try and get a single document from the database.\
1974    /// If no document is found, it will return None.
1975    /// ```
1976    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1977    /// #[tokio::main]
1978    /// async fn main() -> Result<(), OpenIAPError> {
1979    ///     let client = Client::new_connect("").await?;
1980    ///     let config = QueryRequest::with_query(
1981    ///         "users",
1982    ///         "{\"username\":\"guest\"}"
1983    ///     );
1984    ///     let item = client.get_one(config).await;
1985    ///     match item {
1986    ///         Some(item) => {
1987    ///             assert_eq!(item["username"], "guest");
1988    ///             println!("Item: {:?}", item);
1989    ///         }
1990    ///         None => {
1991    ///             println!("No item found");
1992    ///             assert!(false, "No item found");
1993    ///         }
1994    ///     }
1995    ///     Ok(())
1996    /// }
1997    /// ```
1998    #[tracing::instrument(skip_all)]
1999    pub async fn get_one(&self, mut config: QueryRequest) -> Option<serde_json::Value> {
2000        if config.collectionname.is_empty() {
2001            config.collectionname = "entities".to_string();
2002        }
2003        config.top = 1;
2004        let envelope = config.to_envelope();
2005        let result = self.send(envelope, None).await;
2006        match result {
2007            Ok(m) => {
2008                let data = match m.data {
2009                    Some(data) => data,
2010                    None => return None,
2011                };
2012                if m.command == "error" {
2013                    return None;
2014                }
2015                let response: QueryResponse = prost::Message::decode(data.value.as_ref()).ok()?;
2016
2017                let items: serde_json::Value = serde_json::from_str(&response.results).unwrap();
2018                let items: &Vec<serde_json::Value> = items.as_array().unwrap();
2019                if items.is_empty() {
2020                    return None;
2021                }
2022                let item = items[0].clone();
2023                Some(item)
2024            }
2025            Err(_) => None,
2026        }
2027    }
2028
2029    /// Try and get a specefic version of a document from the database, reconstructing it from the history collection
2030    /// ```
2031    /// use openiap_client::{OpenIAPError, Client, GetDocumentVersionRequest, InsertOneRequest, UpdateOneRequest};
2032    /// #[tokio::main]
2033    /// async fn main() -> Result<(), OpenIAPError> {
2034    ///     let client = Client::new_connect("").await?;
2035    ///     let item = "{\"name\": \"test from rust\", \"_type\": \"test\"}";
2036    ///     let query = InsertOneRequest {
2037    ///         collectionname: "entities".to_string(),
2038    ///         item: item.to_string(),
2039    ///         j: true,
2040    ///         w: 2,
2041    ///         ..Default::default()
2042    ///     };
2043    ///     let response = client.insert_one(query).await;
2044    ///     let response = match response {
2045    ///         Ok(r) => r,
2046    ///         Err(e) => {
2047    ///             println!("Error: {:?}", e);
2048    ///             assert!(false, "insert_one failed with {:?}", e);
2049    ///             return Ok(());
2050    ///         }
2051    ///     };
2052    ///     let _obj: serde_json::Value = serde_json::from_str(&response.result).unwrap();
2053    ///     let _id = _obj["_id"].as_str().unwrap();
2054    ///     let item = format!("{{\"name\":\"updated from rust\", \"_id\": \"{}\"}}", _id);
2055    ///     let query = UpdateOneRequest {
2056    ///         collectionname: "entities".to_string(),
2057    ///         item: item.to_string(),
2058    ///         ..Default::default()
2059    ///     };
2060    ///     let response = client.update_one(query).await;
2061    ///     _ = match response {
2062    ///         Ok(r) => r,
2063    ///         Err(e) => {
2064    ///             println!("Error: {:?}", e);
2065    ///             assert!(false, "update_one failed with {:?}", e);
2066    ///             return Ok(());
2067    ///         }
2068    ///     };
2069    ///     let query = GetDocumentVersionRequest {
2070    ///         collectionname: "entities".to_string(),
2071    ///         id: _id.to_string(),
2072    ///         version: 0,
2073    ///         ..Default::default()
2074    ///     };
2075    ///     let response = client.get_document_version(query).await;
2076    ///     let response = match response {
2077    ///         Ok(r) => r,
2078    ///         Err(e) => {
2079    ///             println!("Error: {:?}", e);
2080    ///             assert!(false, "get_document_version failed with {:?}", e);
2081    ///             return Ok(());
2082    ///         }
2083    ///     };
2084    ///     let _obj = serde_json::from_str(&response);
2085    ///     let _obj: serde_json::Value = match _obj {
2086    ///         Ok(r) => r,
2087    ///         Err(e) => {
2088    ///             println!("Error: {:?}", e);
2089    ///             assert!(
2090    ///                 false,
2091    ///                 "parse get_document_version result failed with {:?}",
2092    ///                 e
2093    ///             );
2094    ///             return Ok(());
2095    ///         }
2096    ///     };
2097    ///     let name = _obj["name"].as_str().unwrap();
2098    ///     let version = _obj["_version"].as_i64().unwrap();
2099    ///     println!("version 0 Name: {}, Version: {}", name, version);
2100    ///     assert_eq!(name, "test from rust");
2101    ///     let query = GetDocumentVersionRequest {
2102    ///         collectionname: "entities".to_string(),
2103    ///         id: _id.to_string(),
2104    ///         version: 1,
2105    ///         ..Default::default()
2106    ///     };
2107    ///     let response = client.get_document_version(query).await;
2108    ///     assert!(
2109    ///         response.is_ok(),
2110    ///         "test_get_document_version failed with {:?}",
2111    ///         response.err().unwrap()
2112    ///     );
2113    ///     let _obj: serde_json::Value = serde_json::from_str(&response.unwrap()).unwrap();
2114    ///     let name = _obj["name"].as_str().unwrap();
2115    ///     let version = _obj["_version"].as_i64().unwrap();
2116    ///     println!("version 1 Name: {}, Version: {}", name, version);
2117    ///     assert_eq!(name, "updated from rust");
2118    ///     let query = GetDocumentVersionRequest {
2119    ///         collectionname: "entities".to_string(),
2120    ///         id: _id.to_string(),
2121    ///         version: -1,
2122    ///         ..Default::default()
2123    ///     };
2124    ///     let response = client.get_document_version(query).await;
2125    ///     assert!(
2126    ///         response.is_ok(),
2127    ///         "test_get_document_version failed with {:?}",
2128    ///         response.err().unwrap()
2129    ///     );
2130    ///     let _obj: serde_json::Value = serde_json::from_str(&response.unwrap()).unwrap();
2131    ///     let name = _obj["name"].as_str().unwrap();
2132    ///     let version = _obj["_version"].as_i64().unwrap();
2133    ///     println!("version -1 Name: {}, Version: {}", name, version);
2134    ///     assert_eq!(name, "updated from rust");
2135    ///     Ok(())
2136    /// }
2137    /// ```
2138    #[tracing::instrument(skip_all)]
2139    pub async fn get_document_version(
2140        &self,
2141        mut config: GetDocumentVersionRequest,
2142    ) -> Result<String, OpenIAPError> {
2143        if config.collectionname.is_empty() {
2144            config.collectionname = "entities".to_string();
2145        }
2146        if config.id.is_empty() {
2147            return Err(OpenIAPError::ClientError("No id provided".to_string()));
2148        }
2149        let envelope = config.to_envelope();
2150        let result = self.send(envelope, None).await;
2151        match result {
2152            Ok(m) => {
2153                let data = match m.data {
2154                    Some(data) => data,
2155                    None => {
2156                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2157                    }
2158                };
2159                if m.command == "error" {
2160                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2161                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2162                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2163                }
2164                let response: GetDocumentVersionResponse =
2165                    prost::Message::decode(data.value.as_ref())
2166                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2167                Ok(response.result)
2168            }
2169            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2170        }
2171    }
2172    /// Run an aggregate pipeline towards the database
2173    /// Example of running an aggregate pipeline on the entities collection, counting the number of documents with _type=test, and grouping them by name:
2174    /// ```
2175    /// use openiap_client::{OpenIAPError, Client, AggregateRequest};
2176    /// #[tokio::main]
2177    /// async fn main() -> Result<(), OpenIAPError> {
2178    ///    let client = Client::new_connect("").await?;
2179    ///     let config = AggregateRequest {
2180    ///         collectionname: "entities".to_string(),
2181    ///         aggregates: "[{\"$match\": {\"_type\": \"test\"}}, {\"$group\": {\"_id\": \"$name\", \"count\": {\"$sum\": 1}}}]".to_string(),
2182    ///         ..Default::default()
2183    ///     };
2184    ///     let response = client.aggregate(config).await?;
2185    ///     println!("Response: {:?}", response);
2186    ///     Ok(())
2187    /// }
2188    /// ```
2189    /// 
2190    #[tracing::instrument(skip_all)]
2191    pub async fn aggregate(
2192        &self,
2193        mut config: AggregateRequest,
2194    ) -> Result<AggregateResponse, OpenIAPError> {
2195        if config.collectionname.is_empty() {
2196            config.collectionname = "entities".to_string();
2197        }
2198        if config.hint.is_empty() {
2199            config.hint = "".to_string();
2200        }
2201        if config.queryas.is_empty() {
2202            config.queryas = "".to_string();
2203        }
2204        if config.aggregates.is_empty() {
2205            return Err(OpenIAPError::ClientError(
2206                "No aggregates provided".to_string(),
2207            ));
2208        }
2209        let envelope = config.to_envelope();
2210        let result = self.send(envelope, None).await;
2211        match result {
2212            Ok(m) => {
2213                let data = match m.data {
2214                    Some(data) => data,
2215                    None => {
2216                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2217                    }
2218                };
2219                if m.command == "error" {
2220                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2221                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2222                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2223                }
2224                let response: AggregateResponse = prost::Message::decode(data.value.as_ref())
2225                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2226                Ok(response)
2227            }
2228            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2229        }
2230    }
2231    /// Count the number of documents in a collection, with an optional query
2232    #[tracing::instrument(skip_all)]
2233    pub async fn count(&self, mut config: CountRequest) -> Result<CountResponse, OpenIAPError> {
2234        if config.collectionname.is_empty() {
2235            config.collectionname = "entities".to_string();
2236        }
2237        if config.query.is_empty() {
2238            config.query = "{}".to_string();
2239        }
2240        let envelope = config.to_envelope();
2241        let result = self.send(envelope, None).await;
2242        match result {
2243            Ok(m) => {
2244                let data = match m.data {
2245                    Some(data) => data,
2246                    None => {
2247                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2248                    }
2249                };
2250                if m.command == "error" {
2251                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2252                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2253                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2254                }
2255                let response: CountResponse = prost::Message::decode(data.value.as_ref())
2256                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2257                Ok(response)
2258            }
2259            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2260        }
2261    }
2262    /// Get distinct values for a field in a collection, with an optional query
2263    #[tracing::instrument(skip_all)]
2264    pub async fn distinct(
2265        &self,
2266        mut config: DistinctRequest,
2267    ) -> Result<DistinctResponse, OpenIAPError> {
2268        if config.collectionname.is_empty() {
2269            config.collectionname = "entities".to_string();
2270        }
2271        if config.query.is_empty() {
2272            config.query = "{}".to_string();
2273        }
2274        if config.field.is_empty() {
2275            return Err(OpenIAPError::ClientError("No field provided".to_string()));
2276        }
2277        let envelope = config.to_envelope();
2278        let result = self.send(envelope, None).await;
2279        match result {
2280            Ok(m) => {
2281                let data = match m.data {
2282                    Some(data) => data,
2283                    None => {
2284                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2285                    }
2286                };
2287                if m.command == "error" {
2288                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2289                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2290                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2291                }
2292                let response: DistinctResponse = prost::Message::decode(data.value.as_ref())
2293                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2294                Ok(response)
2295            }
2296            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2297        }
2298    }
2299    /// Insert a document into a collection
2300    #[tracing::instrument(skip_all)]
2301    pub async fn insert_one(
2302        &self,
2303        config: InsertOneRequest,
2304    ) -> Result<InsertOneResponse, OpenIAPError> {
2305        let envelope = config.to_envelope();
2306        let result = self.send(envelope, None).await;
2307        match result {
2308            Ok(m) => {
2309                let data = match m.data {
2310                    Some(data) => data,
2311                    None => {
2312                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2313                    }
2314                };
2315                if m.command == "error" {
2316                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2317                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2318                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2319                }
2320                let response: InsertOneResponse = prost::Message::decode(data.value.as_ref())
2321                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2322                Ok(response)
2323            }
2324            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2325        }
2326    }
2327    /// Insert many documents into a collection
2328    #[tracing::instrument(skip_all)]
2329    pub async fn insert_many(
2330        &self,
2331        config: InsertManyRequest,
2332    ) -> Result<InsertManyResponse, OpenIAPError> {
2333        let envelope = config.to_envelope();
2334        let result = self.send(envelope, None).await;
2335        match result {
2336            Ok(m) => {
2337                let data = match m.data {
2338                    Some(data) => data,
2339                    None => {
2340                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2341                    }
2342                };
2343                if m.command == "error" {
2344                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2345                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2346                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2347                }
2348                let response: InsertManyResponse = prost::Message::decode(data.value.as_ref())
2349                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2350                Ok(response)
2351            }
2352            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2353        }
2354    }
2355    /// Update ( replace ) a document in a collection
2356    #[tracing::instrument(skip_all)]
2357    pub async fn update_one(
2358        &self,
2359        config: UpdateOneRequest,
2360    ) -> Result<UpdateOneResponse, OpenIAPError> {
2361        let envelope = config.to_envelope();
2362        let result = self.send(envelope, None).await;
2363        match result {
2364            Ok(m) => {
2365                let data = match m.data {
2366                    Some(data) => data,
2367                    None => {
2368                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2369                    }
2370                };
2371                if m.command == "error" {
2372                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2373                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2374                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2375                }
2376                let response: UpdateOneResponse = prost::Message::decode(data.value.as_ref())
2377                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2378                Ok(response)
2379            }
2380            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2381        }
2382    }
2383    /// Using a unique key, insert a document or update it if it already exists ( upsert on steroids )
2384    #[tracing::instrument(skip_all)]
2385    pub async fn insert_or_update_one(
2386        &self,
2387        config: InsertOrUpdateOneRequest,
2388    ) -> Result<String, OpenIAPError> {
2389        let envelope = config.to_envelope();
2390        let result = self.send(envelope, None).await;
2391        match result {
2392            Ok(m) => {
2393                let data = match m.data {
2394                    Some(data) => data,
2395                    None => {
2396                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2397                    }
2398                };
2399                if m.command == "error" {
2400                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2401                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2402                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2403                }
2404                let response: InsertOrUpdateOneResponse =
2405                    prost::Message::decode(data.value.as_ref())
2406                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2407                Ok(response.result)
2408            }
2409            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2410        }
2411    }
2412    /// Using a unique key, insert many documents or update them if they already exist ( upsert on steroids )
2413    #[tracing::instrument(skip_all)]
2414    pub async fn insert_or_update_many(
2415        &self,
2416        config: InsertOrUpdateManyRequest,
2417    ) -> Result<InsertOrUpdateManyResponse, OpenIAPError> {
2418        let envelope = config.to_envelope();
2419        let result = self.send(envelope, None).await;
2420        match result {
2421            Ok(m) => {
2422                let data = match m.data {
2423                    Some(data) => data,
2424                    None => {
2425                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2426                    }
2427                };
2428                if m.command == "error" {
2429                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2430                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2431                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2432                }
2433                let response: InsertOrUpdateManyResponse =
2434                    prost::Message::decode(data.value.as_ref())
2435                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2436                Ok(response)
2437            }
2438            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2439        }
2440    }
2441    /// Update one or more documents in a collection using a update document
2442    #[tracing::instrument(skip_all)]
2443    pub async fn update_document(
2444        &self,
2445        config: UpdateDocumentRequest,
2446    ) -> Result<UpdateDocumentResponse, OpenIAPError> {
2447        let envelope = config.to_envelope();
2448        let result = self.send(envelope, None).await;
2449        match result {
2450            Ok(m) => {
2451                let data = match m.data {
2452                    Some(data) => data,
2453                    None => {
2454                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2455                    }
2456                };
2457                if m.command == "error" {
2458                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2459                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2460                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2461                }
2462                let response: UpdateDocumentResponse = prost::Message::decode(data.value.as_ref())
2463                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2464                Ok(response)
2465            }
2466            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2467        }
2468    }
2469    /// Delete a document from a collection using a unique key
2470    #[tracing::instrument(skip_all)]
2471    pub async fn delete_one(&self, config: DeleteOneRequest) -> Result<i32, OpenIAPError> {
2472        let envelope = config.to_envelope();
2473        let result = self.send(envelope, None).await;
2474        match result {
2475            Ok(m) => {
2476                let data = match m.data {
2477                    Some(data) => data,
2478                    None => {
2479                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2480                    }
2481                };
2482                if m.command == "error" {
2483                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2484                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2485                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2486                }
2487                let response: DeleteOneResponse = prost::Message::decode(data.value.as_ref())
2488                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2489                Ok(response.affectedrows)
2490            }
2491            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2492        }
2493    }
2494    /// Delete many documents from a collection using a query or list of unique keys
2495    #[tracing::instrument(skip_all)]
2496    pub async fn delete_many(&self, config: DeleteManyRequest) -> Result<i32, OpenIAPError> {
2497        let envelope = config.to_envelope();
2498        let result = self.send(envelope, None).await;
2499        match result {
2500            Ok(m) => {
2501                let data = match m.data {
2502                    Some(data) => data,
2503                    None => {
2504                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2505                    }
2506                };
2507                if m.command == "error" {
2508                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2509                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2510                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2511                }
2512                let response: DeleteManyResponse = prost::Message::decode(data.value.as_ref())
2513                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2514                Ok(response.affectedrows)
2515            }
2516            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2517        }
2518    }
2519    /// Download a file from the database
2520    #[tracing::instrument(skip_all)]
2521    pub async fn download(
2522        &self,
2523        config: DownloadRequest,
2524        folder: Option<&str>,
2525        filename: Option<&str>,
2526    ) -> Result<DownloadResponse, OpenIAPError> {
2527        let envelope = config.to_envelope();
2528        match self.sendwithstream(envelope).await {
2529            Ok((response_rx, mut stream_rx)) => {
2530                let temp_file_path = util::generate_unique_filename("openiap");
2531                debug!("Temp file: {:?}", temp_file_path);
2532                let mut temp_file = File::create(&temp_file_path).map_err(|e| {
2533                    OpenIAPError::ClientError(format!("Failed to create temp file: {}", e))
2534                })?;
2535                while !stream_rx.is_closed() {
2536                    match stream_rx.recv().await {
2537                        Some(received) => {
2538                            if received.is_empty() {
2539                                debug!("Stream closed");
2540                                break;
2541                            }
2542                            debug!("Received {} bytes", received.len());
2543                            temp_file.write_all(&received).map_err(|e| {
2544                                OpenIAPError::ClientError(format!(
2545                                    "Failed to write to temp file: {}",
2546                                    e
2547                                ))
2548                            })?;
2549                        }
2550                        None => {
2551                            debug!("Stream closed");
2552                            break;
2553                        }
2554                    }
2555                }
2556                temp_file.sync_all().map_err(|e| {
2557                    OpenIAPError::ClientError(format!("Failed to sync temp file: {}", e))
2558                })?;
2559
2560                let response = response_rx.await.map_err(|_| {
2561                    OpenIAPError::ClientError("Failed to receive response".to_string())
2562                })?;
2563
2564                if response.command == "error" {
2565                    let data = match response.data {
2566                        Some(data) => data,
2567                        None => {
2568                            return Err(OpenIAPError::ClientError(
2569                                "No data returned for SERVER error".to_string(),
2570                            ));
2571                        }
2572                    };
2573                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref()).unwrap();
2574                    return Err(OpenIAPError::ServerError(e.message));
2575                }
2576                let mut downloadresponse: DownloadResponse =
2577                    prost::Message::decode(response.data.unwrap().value.as_ref()).unwrap();
2578
2579                let mut final_filename = match &filename {
2580                    Some(f) => f,
2581                    None => downloadresponse.filename.as_str(),
2582                };
2583                if final_filename.is_empty() {
2584                    final_filename = downloadresponse.filename.as_str();
2585                }
2586                let mut folder = match &folder {
2587                    Some(f) => f,
2588                    None => ".",
2589                };
2590                if folder.is_empty() {
2591                    folder = ".";
2592                }
2593                let filepath = format!("{}/{}", folder, final_filename);
2594                trace!("Moving file to {}", filepath);
2595                util::move_file(temp_file_path.to_str().unwrap(), filepath.as_str()).map_err(|e| {
2596                    OpenIAPError::ClientError(format!("Failed to move file: {}", e))
2597                })?;
2598                debug!("Downloaded file to {}", filepath);
2599                downloadresponse.filename = filepath;
2600
2601                Ok(downloadresponse)
2602            }
2603            Err(status) => Err(OpenIAPError::ClientError(status.to_string())),
2604        }
2605    }
2606    /// Upload a file to the database
2607    #[tracing::instrument(skip_all)]
2608    pub async fn upload(
2609        &self,
2610        config: UploadRequest,
2611        filepath: &str,
2612    ) -> Result<UploadResponse, OpenIAPError> {
2613        // debug!("upload: Uploading file: {}", filepath);
2614        // let mut file = File::open(filepath)
2615        //     .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2616        // let chunk_size = 1024 * 1024;
2617        // let mut buffer = vec![0; chunk_size];
2618
2619        // let envelope = config.to_envelope();
2620        // let (response_rx, rid) = self.send_noawait(envelope).await?;
2621        // {
2622        //     let envelope = BeginStream::from_rid(rid.clone());
2623        //     debug!("Sending beginstream to #{}", rid);
2624        //     self.send_envelope(envelope).await.map_err(|e| OpenIAPError::ClientError(format!("Failed to send data: {}", e)))?;
2625        //     let mut counter = 0;
2626
2627        //     loop {
2628        //         let bytes_read = file.read(&mut buffer).map_err(|e| {
2629        //             OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2630        //         })?;
2631        //         counter += 1;
2632
2633        //         if bytes_read == 0 {
2634        //             break;
2635        //         }
2636
2637        //         let chunk = buffer[..bytes_read].to_vec();
2638        //         let envelope = Stream::from_rid(chunk, rid.clone());
2639        //         debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2640        //         self.send_envelope(envelope).await.map_err(|e| {
2641        //             OpenIAPError::ClientError(format!("Failed to send data: {}", e))
2642        //         })?
2643        //     }
2644
2645        //     let envelope = EndStream::from_rid(rid.clone());
2646        //     debug!("Sending endstream to #{}", rid);
2647        //     self.send_envelope(envelope).await
2648        //         .map_err(|e| OpenIAPError::ClientError(format!("Failed to send data: {}", e)))?;
2649        // }
2650
2651        // debug!("Wait for upload response for #{}", rid);
2652        // match response_rx.await {
2653        //     Ok(response) => {
2654        //         if response.command == "error" {
2655        //             let error_response: ErrorResponse = prost::Message::decode(
2656        //                 response.data.unwrap().value.as_ref(),
2657        //             )
2658        //             .map_err(|e| {
2659        //                 OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2660        //             })?;
2661        //             return Err(OpenIAPError::ServerError(error_response.message));
2662        //         }
2663        //         let upload_response: UploadResponse =
2664        //             prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2665        //                 OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2666        //             })?;
2667        //         Ok(upload_response)
2668        //     }
2669        //     Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2670        // }
2671        debug!("upload: Uploading file: {}", filepath);
2672        let mut file = File::open(filepath)
2673            .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2674        let chunk_size = 1024 * 1024;
2675        let mut buffer = vec![0; chunk_size];
2676    
2677        // Send the initial upload request
2678        let envelope = config.to_envelope();
2679        let (response_rx, rid) = self.send_noawait(envelope).await?;
2680        
2681        // Send the BeginStream message
2682        let envelope = BeginStream::from_rid(rid.clone());
2683        debug!("Sending beginstream to #{}", rid);
2684        if let Err(e) = self.send_envelope(envelope).await {
2685            let inner = self.inner.lock().await;
2686            inner.queries.lock().await.remove(&rid);
2687            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2688        }
2689    
2690        // Send file chunks
2691        let mut counter = 0;
2692        loop {
2693            let bytes_read = file.read(&mut buffer).map_err(|e| {
2694                OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2695            })?;
2696            counter += 1;
2697    
2698            if bytes_read == 0 {
2699                break;
2700            }
2701    
2702            let chunk = buffer[..bytes_read].to_vec();
2703            let envelope = Stream::from_rid(chunk, rid.clone());
2704            debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2705            if let Err(e) = self.send_envelope(envelope).await {
2706                let inner = self.inner.lock().await;
2707                inner.queries.lock().await.remove(&rid);
2708                return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2709            }
2710        }
2711    
2712        // Send the EndStream message
2713        let envelope = EndStream::from_rid(rid.clone());
2714        debug!("Sending endstream to #{}", rid);
2715        if let Err(e) = self.send_envelope(envelope).await {
2716            let inner = self.inner.lock().await;
2717            inner.queries.lock().await.remove(&rid);
2718            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2719        }
2720    
2721        // Await the response and clean up `inner.queries` afterward
2722        debug!("Wait for upload response for #{}", rid);
2723        let result = response_rx.await;
2724        let inner = self.inner.lock().await;
2725        inner.queries.lock().await.remove(&rid);
2726    
2727        match result {
2728            Ok(response) => {
2729                if response.command == "error" {
2730                    let error_response: ErrorResponse = prost::Message::decode(
2731                        response.data.unwrap().value.as_ref(),
2732                    )
2733                    .map_err(|e| {
2734                        OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2735                    })?;
2736                    return Err(OpenIAPError::ServerError(error_response.message));
2737                }
2738                let upload_response: UploadResponse =
2739                    prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2740                        OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2741                    })?;
2742                Ok(upload_response)
2743            }
2744            Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2745        }
2746    }
2747    /// Watch for changes in a collection ( change stream )
2748    #[tracing::instrument(skip_all)]
2749    pub async fn watch(
2750        &self,
2751        mut config: WatchRequest,
2752        callback: Box<dyn Fn(WatchEvent) + Send + Sync>,
2753    ) -> Result<String, OpenIAPError> {
2754        if config.collectionname.is_empty() {
2755            config.collectionname = "entities".to_string();
2756        }
2757        if config.paths.is_empty() {
2758            config.paths = vec!["".to_string()];
2759        }
2760
2761        let envelope = config.to_envelope();
2762        let result = self.send(envelope, None).await;
2763        match result {
2764            Ok(m) => {
2765                let data = match m.data {
2766                    Some(data) => data,
2767                    None => {
2768                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2769                    }
2770                };
2771                if m.command == "error" {
2772                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2773                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2774                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2775                }
2776                let response: WatchResponse = prost::Message::decode(data.value.as_ref())
2777                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2778
2779                let inner = self.inner.lock().await;
2780                inner
2781                    .watches
2782                    .lock()
2783                    .await
2784                    .insert(response.id.clone(), callback);
2785
2786                Ok(response.id)
2787            }
2788            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2789        }
2790    }
2791    /// Cancel a watch ( change stream )
2792    #[tracing::instrument(skip_all)]
2793    pub async fn unwatch(&self, id: &str) -> Result<(), OpenIAPError> {
2794        let config = UnWatchRequest::byid(id);
2795        let envelope = config.to_envelope();
2796        let result = self.send(envelope, None).await;
2797        match result {
2798            Ok(m) => {
2799                let data = match m.data {
2800                    Some(data) => data,
2801                    None => {
2802                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2803                    }
2804                };
2805                if m.command == "error" {
2806                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2807                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2808                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2809                }
2810                Ok(())
2811            }
2812            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2813        }
2814    }
2815    /// Register a queue for messaging ( amqp ) in the OpenIAP service
2816    #[tracing::instrument(skip_all)]
2817    pub async fn register_queue(
2818        &self,
2819        mut config: RegisterQueueRequest,
2820        callback: QueueCallbackFn,
2821    ) -> Result<String, OpenIAPError> {
2822        if config.queuename.is_empty() {
2823            config.queuename = "".to_string();
2824        }
2825
2826        let envelope = config.to_envelope();
2827        let result = self.send(envelope, None).await;
2828        match result {
2829            Ok(m) => {
2830                let data = match m.data {
2831                    Some(data) => data,
2832                    None => {
2833                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2834                    }
2835                };
2836                if m.command == "error" {
2837                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2838                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2839                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2840                }
2841                let response: RegisterQueueResponse =
2842                    prost::Message::decode(data.value.as_ref())
2843                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2844
2845                let inner = self.inner.lock().await;
2846                inner
2847                    .queues
2848                    .lock()
2849                    .await
2850                    .insert(response.queuename.clone(), callback);
2851
2852                Ok(response.queuename)
2853            }
2854            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2855        }
2856    }
2857    /// Unregister a queue or exchange for messaging ( amqp ) in the OpenIAP service
2858    #[tracing::instrument(skip_all)]
2859    pub async fn unregister_queue(&self, queuename: &str) -> Result<(), OpenIAPError> {
2860        let config = UnRegisterQueueRequest::byqueuename(queuename);
2861        let envelope = config.to_envelope();
2862        let result = self.send(envelope, None).await;
2863        match result {
2864            Ok(m) => {
2865                let data = match m.data {
2866                    Some(data) => data,
2867                    None => {
2868                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2869                    }
2870                };
2871                if m.command == "error" {
2872                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2873                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2874                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2875                }
2876                Ok(())
2877            }
2878            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2879        }
2880    }
2881    /// Register a exchange for messaging ( amqp ) in the OpenIAP service
2882    #[tracing::instrument(skip_all)]
2883    pub async fn register_exchange(
2884        &self,
2885        mut config: RegisterExchangeRequest,
2886        callback: QueueCallbackFn,
2887    ) -> Result<String, OpenIAPError> {
2888        if config.exchangename.is_empty() {
2889            return Err(OpenIAPError::ClientError(
2890                "No exchange name provided".to_string(),
2891            ));
2892        }
2893        if config.algorithm.is_empty() {
2894            config.algorithm = "fanout".to_string();
2895        }
2896        let envelope = config.to_envelope();
2897        let result = self.send(envelope, None).await;
2898        match result {
2899            Ok(m) => {
2900                let data = match m.data {
2901                    Some(data) => data,
2902                    None => {
2903                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2904                    }
2905                };
2906                if m.command == "error" {
2907                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2908                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2909                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2910                }
2911                let response: RegisterExchangeResponse =
2912                    prost::Message::decode(data.value.as_ref())
2913                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2914                if !response.queuename.is_empty() {
2915                    let inner = self.inner.lock().await;
2916                    inner
2917                        .queues
2918                        .lock()
2919                        .await
2920                        .insert(response.queuename.clone(), callback);
2921                }
2922                Ok(response.queuename)
2923            }
2924            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2925        }
2926    }
2927    /// Send a message to a queue or exchange in the OpenIAP service
2928    #[tracing::instrument(skip_all)]
2929    pub async fn queue_message(
2930        &self,
2931        config: QueueMessageRequest,
2932    ) -> Result<QueueMessageResponse, OpenIAPError> {
2933        if config.queuename.is_empty() && config.exchangename.is_empty() {
2934            return Err(OpenIAPError::ClientError(
2935                "No queue or exchange name provided".to_string(),
2936            ));
2937        }
2938        let envelope = config.to_envelope();
2939        let result = self.send(envelope, None).await;
2940        match result {
2941            Ok(m) => {
2942                let data = match m.data {
2943                    Some(d) => d,
2944                    None => {
2945                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
2946                    }
2947                };
2948                if m.command == "error" {
2949                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2950                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2951                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2952                }
2953                let response: QueueMessageResponse = prost::Message::decode(data.value.as_ref())
2954                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2955                Ok(response)
2956            }
2957            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2958        }
2959    }
2960    /// Send message to a queue or exchange in the OpenIAP service, and wait for a reply
2961    #[tracing::instrument(skip_all)]
2962    pub async fn rpc(&self, mut config: QueueMessageRequest, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
2963        if config.queuename.is_empty() && config.exchangename.is_empty() {
2964            return Err(OpenIAPError::ClientError(
2965                "No queue or exchange name provided".to_string(),
2966            ));
2967        }
2968
2969        let (tx, rx) = oneshot::channel::<String>();
2970        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
2971
2972        // Prepare the callback
2973        let callback: QueueCallbackFn = Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
2974            if let Some(tx) = tx.lock().unwrap().take() {
2975                let _ = tx.send(event.data);
2976            } else {
2977                debug!("Queue already closed");
2978            }
2979            Box::pin(async { None })
2980        });
2981
2982        // Check if we already have a reply queue and callback registered
2983        let mut reply_queue_guard = self.rpc_reply_queue.lock().await;
2984        let mut callback_guard = self.rpc_callback.lock().await;
2985
2986        // If not registered, or if connection state is not Connected/Signedin, re-register
2987        let state = self.get_state();
2988        if reply_queue_guard.is_none() || !(state == ClientState::Connected || state == ClientState::Signedin) {
2989            // Register a new reply queue
2990            let q = self
2991                .register_queue(
2992                    RegisterQueueRequest {
2993                        queuename: "".to_string(),
2994                    },
2995                    callback.clone(),
2996                )
2997                .await?;
2998            *reply_queue_guard = Some(q.clone());
2999            *callback_guard = Some(callback.clone());
3000        } else {
3001            // Already registered, but need to update the callback for this call
3002            if let Some(qname) = reply_queue_guard.as_ref() {
3003                let inner = self.inner.lock().await;
3004                inner.queues.lock().await.insert(qname.clone(), callback.clone());
3005            }
3006        }
3007
3008        let q = reply_queue_guard.as_ref().unwrap().clone();
3009        config.replyto = q.clone();
3010        let envelope = config.to_envelope();
3011
3012        let result = self.send(envelope, None).await;
3013        let rpc_result = match result {
3014            Ok(m) => {
3015                let data = match m.data {
3016                    Some(d) => d,
3017                    None => {
3018                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
3019                    }
3020                };
3021                if m.command == "error" {
3022                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3023                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3024                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3025                }
3026
3027                match tokio::time::timeout(timeout, rx).await {
3028                    Ok(Ok(val)) => Ok(val),
3029                    Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3030                    Err(_) => {
3031                        // Timeout: clear the cached queue so it will be re-registered next time
3032                        *reply_queue_guard = None;
3033                        *callback_guard = None;
3034                        Err(OpenIAPError::ClientError("RPC request timed out".to_string()))
3035                    },
3036                }
3037            }
3038            Err(e) => {
3039                // If we get an error, clear the cached queue so it will be re-registered next time
3040                *reply_queue_guard = None;
3041                *callback_guard = None;
3042                Err(OpenIAPError::ClientError(e.to_string()))
3043            }
3044        };
3045
3046        rpc_result
3047    }
3048    // pub async fn rpc2(&self, mut config: QueueMessageRequest, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
3049    //     if config.queuename.is_empty() && config.exchangename.is_empty() {
3050    //         return Err(OpenIAPError::ClientError(
3051    //             "No queue or exchange name provided".to_string(),
3052    //         ));
3053    //     }
3054
3055    //     let (tx, rx) = oneshot::channel::<String>();
3056    //     let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3057
3058    //     let q = self
3059    //         .register_queue(
3060    //             RegisterQueueRequest {
3061    //                 queuename: "".to_string(),
3062    //             },
3063    //             Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3064    //                 if let Some(tx) = tx.lock().unwrap().take() {
3065    //                     let _ = tx.send(event.data);
3066    //                 } else {
3067    //                     debug!("Queue already closed");
3068    //                 }
3069    //                 Box::pin(async { None })
3070    //             }),
3071    //         )
3072    //         .await
3073    //         .unwrap();
3074
3075    //     config.replyto = q.clone();
3076    //     let envelope = config.to_envelope();
3077
3078    //     let result = self.send(envelope, None).await;
3079    //     let rpc_result = match result {
3080    //         Ok(m) => {
3081    //             let data = match m.data {
3082    //                 Some(d) => d,
3083    //                 None => {
3084    //                     return Err(OpenIAPError::ClientError("No data in response".to_string()))
3085    //                 }
3086    //             };
3087    //             if m.command == "error" {
3088    //                 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3089    //                     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3090    //                 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3091    //             }
3092
3093    //             match tokio::time::timeout(timeout, rx).await {
3094    //                 Ok(Ok(val)) => Ok(val),
3095    //                 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3096    //                 Err(_) => Err(OpenIAPError::ClientError("RPC request timed out".to_string())),
3097    //             }
3098    //         }
3099    //         Err(e) => {
3100    //             let unregister_err = self.unregister_queue(&q).await.err();
3101    //             if unregister_err.is_some() {
3102    //                 error!("Failed to unregister Response Queue: {:?}", unregister_err);
3103    //             }
3104    //             Err(OpenIAPError::ClientError(e.to_string()))
3105    //         }
3106    //     };
3107
3108    //     if let Err(e) = self.unregister_queue(&q).await {
3109    //         error!("Failed to unregister Response Queue: {:?}", e);
3110    //     } else {
3111    //         debug!("Unregistered Response Queue: {:?}", q);
3112    //     }
3113    //     match rpc_result {
3114    //         Ok(val) => Ok(val),
3115    //         Err(e) => Err(e),
3116    //     }
3117    // }
3118    /// Push a new workitem to a workitem queue
3119    /// If the file is less than 5 megabytes it will be attached to the workitem
3120    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3121    #[tracing::instrument(skip_all)]
3122    pub async fn push_workitem(
3123        &self,
3124        mut config: PushWorkitemRequest,
3125    ) -> Result<PushWorkitemResponse, OpenIAPError> {
3126        if config.wiq.is_empty() && config.wiqid.is_empty() {
3127            return Err(OpenIAPError::ClientError(
3128                "No queue name or id provided".to_string(),
3129            ));
3130        }
3131        for f in &mut config.files {
3132            if f.filename.is_empty() && f.file.is_empty() {
3133                debug!("Filename is empty");
3134            } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3135                // does file exist?
3136                if !std::path::Path::new(&f.filename).exists() {
3137                    debug!("File does not exist: {}", f.filename);
3138                } else {
3139                    let filesize = std::fs::metadata(&f.filename).unwrap().len();
3140                    // if filesize is less than 5 meggabytes attach it, else upload
3141                    if filesize < 5 * 1024 * 1024 {
3142                        debug!("File {} exists so ATTACHING it.", f.filename);
3143                        let filename = std::path::Path::new(&f.filename)
3144                            .file_name()
3145                            .unwrap()
3146                            .to_str()
3147                            .unwrap();
3148                        f.file = std::fs::read(&f.filename).unwrap();
3149                        // f.file = compress_file(&f.filename).unwrap();
3150                        // f.compressed = false;
3151                        f.file = util::compress_file_to_vec(&f.filename).unwrap();
3152                        f.compressed = true;
3153                        f.filename = filename.to_string();
3154                        f.id = "findme".to_string();
3155                        trace!(
3156                            "File {} was read and assigned to f.file, size: {}",
3157                            f.filename,
3158                            f.file.len()
3159                        );
3160                    } else {
3161                        debug!("File {} exists so UPLOADING it.", f.filename);
3162                        let filename = std::path::Path::new(&f.filename)
3163                            .file_name()
3164                            .unwrap()
3165                            .to_str()
3166                            .unwrap();
3167                        let uploadconfig = UploadRequest {
3168                            filename: filename.to_string(),
3169                            collectionname: "fs.files".to_string(),
3170                            ..Default::default()
3171                        };
3172                        let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3173                        trace!("File {} was upload as {}", filename, uploadresult.id);
3174                        // f.filename = "".to_string();
3175                        f.id = uploadresult.id.clone();
3176                        f.filename = filename.to_string();
3177                    }
3178                }
3179            } else {
3180                debug!("File {} is already uploaded", f.filename);
3181            }
3182        }
3183        let envelope = config.to_envelope();
3184        let result = self.send(envelope, None).await;
3185        match result {
3186            Ok(m) => {
3187                let data = match m.data {
3188                    Some(data) => data,
3189                    None => {
3190                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3191                    }
3192                };
3193                if m.command == "error" {
3194                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3195                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3196                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3197                }
3198                let response: PushWorkitemResponse = prost::Message::decode(data.value.as_ref())
3199                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3200                Ok(response)
3201            }
3202            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3203        }
3204    }
3205    /// Push multiple workitems to a workitem queue
3206    /// If the file is less than 5 megabytes it will be attached to the workitem
3207    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3208    #[tracing::instrument(skip_all)]
3209    pub async fn push_workitems(
3210        &self,
3211        mut config: PushWorkitemsRequest,
3212    ) -> Result<PushWorkitemsResponse, OpenIAPError> {
3213        if config.wiq.is_empty() && config.wiqid.is_empty() {
3214            return Err(OpenIAPError::ClientError(
3215                "No queue name or id provided".to_string(),
3216            ));
3217        }
3218        for wi in &mut config.items {
3219            for f in &mut wi.files {
3220                if f.filename.is_empty() && f.file.is_empty() {
3221                    debug!("Filename is empty");
3222                } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3223                    // does file exist?
3224                    if !std::path::Path::new(&f.filename).exists() {
3225                        debug!("File does not exist: {}", f.filename);
3226                    } else {
3227                        let filesize = std::fs::metadata(&f.filename).unwrap().len();
3228                        // if filesize is less than 5 meggabytes attach it, else upload
3229                        if filesize < 5 * 1024 * 1024 {
3230                            debug!("File {} exists so ATTACHING it.", f.filename);
3231                            let filename = std::path::Path::new(&f.filename)
3232                                .file_name()
3233                                .unwrap()
3234                                .to_str()
3235                                .unwrap();
3236                            f.file = std::fs::read(&f.filename).unwrap();
3237                            // f.file = compress_file(&f.filename).unwrap();
3238                            // f.compressed = false;
3239                            f.file = util::compress_file_to_vec(&f.filename).unwrap();
3240                            f.compressed = true;
3241                            f.filename = filename.to_string();
3242                            f.id = "findme".to_string();
3243                            trace!(
3244                                "File {} was read and assigned to f.file, size: {}",
3245                                f.filename,
3246                                f.file.len()
3247                            );
3248                        } else {
3249                            debug!("File {} exists so UPLOADING it.", f.filename);
3250                            let filename = std::path::Path::new(&f.filename)
3251                                .file_name()
3252                                .unwrap()
3253                                .to_str()
3254                                .unwrap();
3255                            let uploadconfig = UploadRequest {
3256                                filename: filename.to_string(),
3257                                collectionname: "fs.files".to_string(),
3258                                ..Default::default()
3259                            };
3260                            let uploadresult =
3261                                self.upload(uploadconfig, &f.filename).await.unwrap();
3262                            trace!("File {} was upload as {}", filename, uploadresult.id);
3263                            // f.filename = "".to_string();
3264                            f.id = uploadresult.id.clone();
3265                            f.filename = filename.to_string();
3266                        }
3267                    }
3268                } else {
3269                    debug!("File {} is already uploaded", f.filename);
3270                }
3271            }
3272        }
3273        let envelope = config.to_envelope();
3274        let result = self.send(envelope, None).await;
3275        match result {
3276            Ok(m) => {
3277                let data = match m.data {
3278                    Some(data) => data,
3279                    None => {
3280                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3281                    }
3282                };
3283                if m.command == "error" {
3284                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3285                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3286                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3287                }
3288                let response: PushWorkitemsResponse =
3289                    prost::Message::decode(data.value.as_ref())
3290                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3291                Ok(response)
3292            }
3293            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3294        }
3295    }
3296    /// Pop a workitem from a workitem queue, return None if no workitem is available
3297    /// Any files attached to the workitem will be downloaded to the downloadfolder ( default "." )
3298    #[tracing::instrument(skip_all)]
3299    pub async fn pop_workitem(
3300        &self,
3301        config: PopWorkitemRequest,
3302        downloadfolder: Option<&str>,
3303    ) -> Result<PopWorkitemResponse, OpenIAPError> {
3304        if config.wiq.is_empty() && config.wiqid.is_empty() {
3305            return Err(OpenIAPError::ClientError(
3306                "No queue name or id provided".to_string(),
3307            ));
3308        }
3309        let envelope = config.to_envelope();
3310        let result = self.send(envelope, None).await;
3311        match result {
3312            Ok(m) => {
3313                let data = match m.data {
3314                    Some(data) => data,
3315                    None => {
3316                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3317                    }
3318                };
3319                if m.command == "error" {
3320                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3321                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3322                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3323                }
3324                let response: PopWorkitemResponse = prost::Message::decode(data.value.as_ref())
3325                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3326
3327                match &response.workitem {
3328                    Some(wi) => {
3329                        for f in &wi.files {
3330                            if !f.id.is_empty() {
3331                                let downloadconfig = DownloadRequest {
3332                                    id: f.id.clone(),
3333                                    collectionname: "fs.files".to_string(),
3334                                    ..Default::default()
3335                                };
3336                                let downloadresult =
3337                                    match self.download(downloadconfig, downloadfolder, None).await
3338                                    {
3339                                        Ok(r) => r,
3340                                        Err(e) => {
3341                                            debug!("Failed to download file: {}", e);
3342                                            continue;
3343                                        }
3344                                    };
3345                                debug!(
3346                                    "File {} was downloaded as {}",
3347                                    f.filename, downloadresult.filename
3348                                );
3349                            }
3350                        }
3351                    }
3352                    None => {
3353                        debug!("No workitem found");
3354                    }
3355                }
3356                Ok(response)
3357            }
3358            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3359        }
3360    }
3361    /// Update a workitem in a workitem queue
3362    /// If the file is less than 5 megabytes it will be attached to the workitem
3363    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3364    /// If a fileid is provided it will be used to update the file
3365    /// if a filename is provided without the id, it will be deleted
3366    #[tracing::instrument(skip_all)]
3367    pub async fn update_workitem(
3368        &self,
3369        mut config: UpdateWorkitemRequest,
3370    ) -> Result<UpdateWorkitemResponse, OpenIAPError> {
3371        match &config.workitem {
3372            Some(wiq) => {
3373                if wiq.id.is_empty() {
3374                    return Err(OpenIAPError::ClientError(
3375                        "No workitem id provided".to_string(),
3376                    ));
3377                }
3378            }
3379            None => {
3380                return Err(OpenIAPError::ClientError(
3381                    "No workitem provided".to_string(),
3382                ));
3383            }
3384        }
3385        for f in &mut config.files {
3386            if f.filename.is_empty() && f.file.is_empty() {
3387                debug!("Filename is empty");
3388            } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3389                if !std::path::Path::new(&f.filename).exists() {
3390                    debug!("File does not exist: {}", f.filename);
3391                } else {
3392                    debug!("File {} exists so uploading it.", f.filename);
3393                    let filename = std::path::Path::new(&f.filename)
3394                        .file_name()
3395                        .unwrap()
3396                        .to_str()
3397                        .unwrap();
3398                    let uploadconfig = UploadRequest {
3399                        filename: filename.to_string(),
3400                        collectionname: "fs.files".to_string(),
3401                        ..Default::default()
3402                    };
3403                    let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3404                    trace!("File {} was upload as {}", filename, uploadresult.id);
3405                    f.id = uploadresult.id.clone();
3406                    f.filename = filename.to_string();
3407                }
3408            } else {
3409                debug!("Skipped file");
3410            }
3411        }
3412        let envelope = config.to_envelope();
3413        let result = self.send(envelope, None).await;
3414        match result {
3415            Ok(m) => {
3416                let data = match m.data {
3417                    Some(d) => d,
3418                    None => {
3419                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3420                    }
3421                };
3422                if m.command == "error" {
3423                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3424                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3425                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3426                }
3427                let response: UpdateWorkitemResponse = prost::Message::decode(data.value.as_ref())
3428                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3429                Ok(response)
3430            }
3431            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3432        }
3433    }
3434    /// Delete a workitem from a workitem queue
3435    #[tracing::instrument(skip_all)]
3436    pub async fn delete_workitem(
3437        &self,
3438        config: DeleteWorkitemRequest,
3439    ) -> Result<DeleteWorkitemResponse, OpenIAPError> {
3440        if config.id.is_empty() {
3441            return Err(OpenIAPError::ClientError(
3442                "No workitem id provided".to_string(),
3443            ));
3444        }
3445        let envelope = config.to_envelope();
3446        let result = self.send(envelope, None).await;
3447        match result {
3448            Ok(m) => {
3449                let data = match m.data {
3450                    Some(d) => d,
3451                    None => {
3452                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3453                    }
3454                };
3455                if m.command == "error" {
3456                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3457                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3458                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3459                }
3460                let response: DeleteWorkitemResponse = prost::Message::decode(data.value.as_ref())
3461                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3462                Ok(response)
3463            }
3464            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3465        }
3466    }
3467    /// Add a workitem queue to openiap instance
3468    #[tracing::instrument(skip_all)]
3469    pub async fn add_workitem_queue(
3470        &self,
3471        config: AddWorkItemQueueRequest,
3472    ) -> Result<WorkItemQueue, OpenIAPError> {
3473        if config.workitemqueue.is_none() {
3474            return Err(OpenIAPError::ClientError(
3475                "No workitem queue name provided".to_string(),
3476            ));
3477        }
3478        let envelope = config.to_envelope();
3479        let result = self.send(envelope, None).await;
3480        match result {
3481            Ok(m) => {
3482                let data = match m.data {
3483                    Some(d) => d,
3484                    None => {
3485                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3486                    }
3487                };
3488                if m.command == "error" {
3489                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3490                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3491                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3492                }
3493                let response: AddWorkItemQueueResponse =
3494                    prost::Message::decode(data.value.as_ref())
3495                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3496                match response.workitemqueue {
3497                    Some(wiq) => Ok(wiq),
3498                    None => {
3499                        return Err(OpenIAPError::ClientError(
3500                            "No workitem queue returned".to_string(),
3501                        ));
3502                    }
3503                }
3504            }
3505            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3506        }
3507    }
3508    /// Update a workitem queue in openiap instance
3509    #[tracing::instrument(skip_all)]
3510    pub async fn update_workitem_queue(
3511        &self,
3512        config: UpdateWorkItemQueueRequest,
3513    ) -> Result<WorkItemQueue, OpenIAPError> {
3514        if config.workitemqueue.is_none() {
3515            return Err(OpenIAPError::ClientError(
3516                "No workitem queue name provided".to_string(),
3517            ));
3518        }
3519        let envelope = config.to_envelope();
3520        let result = self.send(envelope, None).await;
3521        match result {
3522            Ok(m) => {
3523                let data = match m.data {
3524                    Some(d) => d,
3525                    None => {
3526                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3527                    }
3528                };
3529                if m.command == "error" {
3530                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3531                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3532                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3533                }
3534                let response: UpdateWorkItemQueueResponse =
3535                    prost::Message::decode(data.value.as_ref())
3536                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3537                match response.workitemqueue {
3538                    Some(wiq) => Ok(wiq),
3539                    None => {
3540                        return Err(OpenIAPError::ClientError(
3541                            "No workitem queue returned".to_string(),
3542                        ));
3543                    }
3544                }
3545            }
3546            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3547        }
3548    }
3549    /// Delete a workitem queue from openiap instance
3550    #[tracing::instrument(skip_all)]
3551    pub async fn delete_workitem_queue(
3552        &self,
3553        config: DeleteWorkItemQueueRequest,
3554    ) -> Result<(), OpenIAPError> {
3555        if config.wiq.is_empty() && config.wiqid.is_empty() {
3556            return Err(OpenIAPError::ClientError(
3557                "No workitem queue name or id provided".to_string(),
3558            ));
3559        }
3560        let envelope = config.to_envelope();
3561        let result = self.send(envelope, None).await;
3562        match result {
3563            Ok(m) => {
3564                let data = match m.data {
3565                    Some(d) => d,
3566                    None => {
3567                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3568                    }
3569                };
3570                if m.command == "error" {
3571                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3572                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3573                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3574                }
3575                Ok(())
3576            }
3577            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3578        }
3579    }
3580    /// Run custom command on server. Custom commands are commands who is "on trail", they may change and are not ready to be moved to the fixed protobuf format yet
3581    #[tracing::instrument(skip_all)]
3582    pub async fn custom_command(
3583        &self,
3584        config: CustomCommandRequest,
3585        timeout: Option<tokio::time::Duration>,
3586    ) -> Result<String, OpenIAPError> {
3587        if config.command.is_empty() {
3588            return Err(OpenIAPError::ClientError("No command provided".to_string()));
3589        }
3590        let envelope = config.to_envelope();
3591        let result = self.send(envelope, timeout).await;
3592        match result {
3593            Ok(m) => {
3594                let data = match m.data {
3595                    Some(d) => d,
3596                    None => {
3597                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3598                    }
3599                };
3600                if m.command == "error" {
3601                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3602                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3603                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3604                }
3605                let response: CustomCommandResponse =
3606                    prost::Message::decode(data.value.as_ref())
3607                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3608                Ok(response.result)
3609            }
3610            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3611        }
3612    }
3613    /// Delete a package from the database, cleaning up all all files and data
3614    #[tracing::instrument(skip_all)]
3615    pub async fn delete_package(&self, packageid: &str) -> Result<(), OpenIAPError> {
3616        let config = DeletePackageRequest::byid(packageid);
3617        let envelope = config.to_envelope();
3618        let result = self.send(envelope, None).await;
3619        match result {
3620            Ok(m) => {
3621                let data = match m.data {
3622                    Some(data) => data,
3623                    None => {
3624                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3625                    }
3626                };
3627                if m.command == "error" {
3628                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3629                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3630                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3631                }
3632                // prost::Message::decode(data.value.as_ref())
3633                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3634                Ok(())
3635            }
3636            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3637        }
3638    }
3639    /// Start Agent
3640    #[tracing::instrument(skip_all)]
3641    pub async fn start_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3642        let config = StartAgentRequest::byid(agentid);
3643        let envelope = config.to_envelope();
3644        let result = self.send(envelope, None).await;
3645        match result {
3646            Ok(m) => {
3647                let data = match m.data {
3648                    Some(d) => d,
3649                    None => {
3650                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3651                    }
3652                };
3653                if m.command == "error" {
3654                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3655                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3656                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3657                }
3658                // prost::Message::decode(data.value.as_ref())
3659                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3660                Ok(())
3661            }
3662            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3663        }
3664    }
3665    /// Stop an agent, this will cleanup all resources and stop the agent
3666    #[tracing::instrument(skip_all)]
3667    pub async fn stop_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3668        let config = StopAgentRequest::byid(agentid);
3669        let envelope = config.to_envelope();
3670        let result = self.send(envelope, None).await;
3671        match result {
3672            Ok(m) => {
3673                let data = match m.data {
3674                    Some(d) => d,
3675                    None => {
3676                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3677                    }
3678                };
3679                if m.command == "error" {
3680                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3681                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3682                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3683                }
3684                // prost::Message::decode(data.value.as_ref())
3685                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3686                Ok(())
3687            }
3688            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3689        }
3690    }
3691    /// Delete a pod from an agent, on kubernetes this will remove the pod and kubernetes will re-create it, on docker this will remove the pod. Then use start_agent to start the agent again
3692    #[tracing::instrument(skip_all)]
3693    pub async fn delete_agent_pod(&self, agentid: &str, podname: &str) -> Result<(), OpenIAPError> {
3694        let config = DeleteAgentPodRequest::byid(agentid, podname);
3695        let envelope = config.to_envelope();
3696        let result = self.send(envelope, None).await;
3697        match result {
3698            Ok(m) => {
3699                let data = match m.data {
3700                    Some(d) => d,
3701                    None => {
3702                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3703                    }
3704                };
3705                if m.command == "error" {
3706                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3707                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3708                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3709                }
3710                // prost::Message::decode(data.value.as_ref())
3711                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3712                Ok(())
3713            }
3714            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3715        }
3716    }
3717    /// Delete an agent, this will cleanup all resources and delete the agent
3718    #[tracing::instrument(skip_all)]
3719    pub async fn delete_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3720        let config = DeleteAgentRequest::byid(agentid);
3721        let envelope = config.to_envelope();
3722        let result = self.send(envelope, None).await;
3723        match result {
3724            Ok(m) => {
3725                let data = match m.data {
3726                    Some(d) => d,
3727                    None => {
3728                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3729                    }
3730                };
3731                if m.command == "error" {
3732                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3733                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3734                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3735                }
3736                // prost::Message::decode(data.value.as_ref())
3737                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3738                Ok(())
3739            }
3740            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3741        }
3742    }
3743    /// Get all pods associated with an agent, if stats is true, it will return memory and cpu usage for each pod
3744    #[tracing::instrument(skip_all)]
3745    pub async fn get_agent_pods(&self, agentid: &str, stats: bool) -> Result<String, OpenIAPError> {
3746        let config = GetAgentPodsRequest::byid(agentid, stats);
3747        let envelope = config.to_envelope();
3748        let result = self.send(envelope, None).await;
3749        match result {
3750            Ok(m) => {
3751                let data = match m.data {
3752                    Some(d) => d,
3753                    None => {
3754                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3755                    }
3756                };
3757                if m.command == "error" {
3758                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3759                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3760                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3761                }
3762                let response: GetAgentPodsResponse = prost::Message::decode(data.value.as_ref())
3763                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3764                Ok(response.results)
3765            }
3766            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3767        }
3768    }
3769    /// Get logs from a pod associated with an agent, leave podname empty to get logs from all pods
3770    #[tracing::instrument(skip_all)]
3771    pub async fn get_agent_pod_logs(
3772        &self,
3773        agentid: &str,
3774        podname: &str,
3775    ) -> Result<String, OpenIAPError> {
3776        let config = GetAgentLogRequest::new(agentid, podname);
3777        let envelope = config.to_envelope();
3778        let result = self.send(envelope, None).await;
3779        match result {
3780            Ok(m) => {
3781                let data = match m.data {
3782                    Some(d) => d,
3783                    None => {
3784                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3785                    }
3786                };
3787                if m.command == "error" {
3788                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3789                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3790                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3791                }
3792                let response: GetAgentLogResponse = prost::Message::decode(data.value.as_ref())
3793                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3794                Ok(response.result)
3795            }
3796            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3797        }
3798    }
3799
3800    /// Create/update a customer in the OpenIAP service. If stripe has been configured, it will create or update a customer in stripe as well
3801    /// A customer is a customer object that can only be updated using this function, and 2 roles ( customername admins and customername users )
3802    #[tracing::instrument(skip_all)]
3803    pub async fn ensure_customer(
3804        &self,
3805        config: EnsureCustomerRequest,
3806    ) -> Result<EnsureCustomerResponse, OpenIAPError> {
3807        if config.customer.is_none() && config.stripe.is_none() {
3808            return Err(OpenIAPError::ClientError(
3809                "No customer or stripe provided".to_string(),
3810            ));
3811        }
3812        let envelope = config.to_envelope();
3813        let result = self.send(envelope, None).await;
3814        match result {
3815            Ok(m) => {
3816                let data = match m.data {
3817                    Some(d) => d,
3818                    None => {
3819                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3820                    }
3821                };
3822                if m.command == "error" {
3823                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3824                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3825                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3826                }
3827                let response: EnsureCustomerResponse = prost::Message::decode(data.value.as_ref())
3828                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3829                Ok(response)
3830            }
3831            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3832        }
3833    }
3834    /// Create a new workflow instance, to be used to workflow in/out nodes in NodeRED
3835    #[tracing::instrument(skip_all)]
3836    pub async fn create_workflow_instance(
3837        &self,
3838        config: CreateWorkflowInstanceRequest,
3839    ) -> Result<String, OpenIAPError> {
3840        if config.workflowid.is_empty() {
3841            return Err(OpenIAPError::ClientError(
3842                "No workflow id provided".to_string(),
3843            ));
3844        }
3845        let envelope = config.to_envelope();
3846        let result = self.send(envelope, None).await;
3847        match result {
3848            Ok(m) => {
3849                let data = match m.data {
3850                    Some(d) => d,
3851                    None => {
3852                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3853                    }
3854                };
3855                if m.command == "error" {
3856                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3857                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3858                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3859                }
3860                let response: CreateWorkflowInstanceResponse =
3861                    prost::Message::decode(data.value.as_ref())
3862                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3863                Ok(response.instanceid)
3864            }
3865            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3866        }
3867    }
3868
3869    /// Invoke a workflow in the OpenRPA robot where robotid is the userid of the user the robot is running as, or a roleid with RPA enabled
3870    #[tracing::instrument(skip_all)]
3871    pub async fn invoke_openrpa(
3872        &self,
3873        config: InvokeOpenRpaRequest,
3874    ) -> Result<String, OpenIAPError> {
3875        if config.robotid.is_empty() {
3876            return Err(OpenIAPError::ClientError(
3877                "No robot id provided".to_string(),
3878            ));
3879        }
3880        if config.workflowid.is_empty() {
3881            return Err(OpenIAPError::ClientError(
3882                "No workflow id provided".to_string(),
3883            ));
3884        }
3885
3886        let (tx, rx) = oneshot::channel::<String>();
3887        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3888
3889        let q = self
3890            .register_queue(
3891                RegisterQueueRequest {
3892                    queuename: "".to_string(),
3893                },
3894                Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3895                    let tx = tx.clone();
3896                    Box::pin(async move {
3897                        let json = event.data.clone();
3898                        let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3899                        let command: String = obj["command"].as_str().unwrap().to_string();
3900                        debug!("Received event: {:?}", event);
3901                        if command.eq("invokesuccess") {
3902                            debug!("Robot successfully started running workflow");
3903                        } else if command.eq("invokeidle") {
3904                            debug!("Workflow went idle");
3905                        } else if command.eq("invokeerror") {
3906                            debug!("Robot failed to run workflow");
3907                            let tx = tx.lock().unwrap().take().unwrap();
3908                            tx.send(event.data).unwrap();
3909                        } else if command.eq("timeout") {
3910                            debug!("No robot picked up the workflow");
3911                            let tx = tx.lock().unwrap().take().unwrap();
3912                            tx.send(event.data).unwrap();
3913                        } else if command.eq("invokecompleted") {
3914                            debug!("Robot completed running workflow");
3915                            let tx = tx.lock().unwrap().take().unwrap();
3916                            tx.send(event.data).unwrap();
3917                        } else {
3918                            let tx = tx.lock().unwrap().take().unwrap();
3919                            tx.send(event.data).unwrap();
3920                        }
3921                        None
3922                    })
3923                }),
3924            )
3925            .await
3926            .unwrap();
3927        debug!("Registered Response Queue: {:?}", q);
3928        let data = format!(
3929            "{{\"command\":\"invoke\",\"workflowid\":\"{}\",\"payload\": {}}}",
3930            config.workflowid, config.payload
3931        );
3932        debug!("Send Data: {}", data);
3933        debug!("To Queue: {} With reply to: {}", config.robotid, q);
3934        let config = QueueMessageRequest {
3935            queuename: config.robotid.clone(),
3936            replyto: q.clone(),
3937            data,
3938            ..Default::default()
3939        };
3940
3941        let envelope = config.to_envelope();
3942
3943        let result = self.send(envelope, None).await;
3944        match result {
3945            Ok(m) => {
3946                let data = match m.data {
3947                    Some(d) => d,
3948                    None => {
3949                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
3950                    }
3951                };
3952                if m.command == "error" {
3953                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3954                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3955                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3956                }
3957                // prost::Message::decode(data.value.as_ref())
3958                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3959
3960                let json = rx.await.unwrap();
3961                debug!("Received json result: {:?}", json);
3962                let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3963                let command: String = obj["command"].as_str().unwrap().to_string();
3964                let mut data = "".to_string();
3965                if obj["data"].as_str().is_some() {
3966                    data = obj["data"].as_str().unwrap().to_string();
3967                } else if obj["data"].as_object().is_some() {
3968                    data = obj["data"].to_string();
3969                }
3970                if !command.eq("invokecompleted") {
3971                    if command.eq("timeout") {
3972                        return Err(OpenIAPError::ServerError("Timeout".to_string()));
3973                    } else {
3974                        if data.is_empty() {
3975                            return Err(OpenIAPError::ServerError(
3976                                "Error with no message".to_string(),
3977                            ));
3978                        }
3979                        return Err(OpenIAPError::ServerError(data));
3980                    }
3981                }
3982                let response = self.unregister_queue(&q).await;
3983                match response {
3984                    Ok(_) => {
3985                        debug!("Unregistered Response Queue: {:?}", q);
3986                    }
3987                    Err(e) => {
3988                        error!("Failed to unregister Response Queue: {:?}", e);
3989                    }
3990                }
3991                Ok(data)
3992            }
3993            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3994        }
3995    }
3996}
3997