openiap_client/
lib.rs

1#![warn(missing_docs)]
2//! The `openiap.client` crate provides the [Client] struct and its methods.
3//! For now this is only the GRPC and WebSocket client, later we will add a web rest, named pipe and TCP client as well.
4//! Initialize a new client, by calling the [Client::new_connect] method.
5//! ```
6//! use openiap_client::{OpenIAPError, Client, QueryRequest};
7//! #[tokio::main]
8//! async fn main() -> Result<(), OpenIAPError> {
9//!     let client = Client::new_connect("").await?;
10//!     let q = client.query( QueryRequest::with_projection(
11//!         "entities",
12//!         "{}",
13//!         "{\"name\":1}"
14//!     )).await?;
15//!     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
16//!     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
17//!     for item in items {
18//!         println!("Item: {:?}", item);
19//!     }
20//!     Ok(())
21//! }
22//! ```
23
24pub use openiap_proto::errors::*;
25pub use openiap_proto::protos::*;
26pub use openiap_proto::*;
27pub use prost_types::Timestamp;
28pub use protos::flow_service_client::FlowServiceClient;
29use sqids::Sqids;
30use std::fmt::{Display,Formatter};
31use tokio::task::JoinHandle;
32use tokio_tungstenite::WebSocketStream;
33use tracing::{debug, error, info, trace};
34type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
35type Result<T, E = StdError> = ::std::result::Result<T, E>;
36use std::fs::File;
37use std::io::{Read, Write};
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Arc;
40use tokio::sync::Mutex;
41use tonic::transport::Channel;
42
43use tokio::sync::{mpsc, oneshot};
44
45use std::env;
46use std::time::Duration;
47
48#[cfg(feature = "otel")]
49mod otel;
50mod tests;
51mod ws;
52mod grpc;
53mod util;
54pub use crate::util::{set_otel_url, enable_tracing, disable_tracing};
55pub use crate::otel::{set_f64_observable_gauge, set_u64_observable_gauge, set_i64_observable_gauge, disable_observable_gauge};
56
57type QuerySender = oneshot::Sender<Envelope>;
58type StreamSender = mpsc::Sender<Vec<u8>>;
59type Sock = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
60use futures::StreamExt;
61use async_channel::unbounded;
62const VERSION: &str = "0.0.35";
63
64
65/// The `Client` struct provides the client for the OpenIAP service.
66/// Initialize a new client, by calling the [Client::new_connect] method.
67#[derive(Clone)]
68pub struct Client {
69    /// Ensure we use only call connect once, and then use re-connect instead.
70    connect_called: Arc<std::sync::Mutex<bool>>,
71    /// The tokio runtime.
72    runtime: Arc<std::sync::Mutex<Option<tokio::runtime::Runtime>>>,
73
74    /// Keep track of usage of the client
75    stats: Arc<std::sync::Mutex<ClientStatistics>>,
76
77    task_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
78    /// The inner client object
79    pub client: Arc<std::sync::Mutex<ClientEnum>>,
80    /// The signed in user.
81    user: Arc<std::sync::Mutex<Option<User>>>,
82    /// The inner client.
83    pub inner: Arc<Mutex<ClientInner>>,
84    /// The `Config` struct provides the configuration for the OpenIAP service we are connecting to.
85    pub config: Arc<std::sync::Mutex<Option<Config>>>,
86    /// Should client automatically reconnect, if disconnected?
87    pub auto_reconnect: Arc<std::sync::Mutex<bool>>,
88    /// URL used to connect to server, processed and without credentials
89    pub url: Arc<std::sync::Mutex<String>>,
90    /// Username used to connect to server
91    pub username: Arc<std::sync::Mutex<String>>,
92    /// Password used to connect to server
93    pub password: Arc<std::sync::Mutex<String>>,
94    /// JWT token used to connect to server
95    pub jwt: Arc<std::sync::Mutex<String>>,
96    service_name: Arc<std::sync::Mutex<String>>,
97    agent_name: Arc<std::sync::Mutex<String>>,
98    agent_version: Arc<std::sync::Mutex<String>>,
99    event_sender: async_channel::Sender<ClientEvent>,
100    event_receiver: async_channel::Receiver<ClientEvent>,
101    out_envelope_sender: async_channel::Sender<Envelope>,
102    out_envelope_receiver: async_channel::Receiver<Envelope>,
103    rpc_reply_queue: Arc<tokio::sync::Mutex<Option<String>>>,
104    rpc_callback: Arc<tokio::sync::Mutex<Option<QueueCallbackFn>>>,
105
106    /// The client connection state.
107    pub state: Arc<std::sync::Mutex<ClientState>>,
108    /// Inceasing message count, used as unique id for messages.
109    pub msgcount: Arc<std::sync::Mutex<i32>>,
110    /// Reconnect interval in milliseconds, this will slowly increase if we keep getting disconnected.
111    pub reconnect_ms: Arc<std::sync::Mutex<i32>>,
112    /// The default timeout for requests
113    pub default_timeout: Arc<std::sync::Mutex<tokio::time::Duration>>,
114}
115/// The `ClientStatistics` struct provides the statistics for usage of the client
116#[derive(Clone, Default)]
117pub struct ClientStatistics {
118    connection_attempts: u64,
119    connections: u64,
120    package_tx: u64,
121    package_rx: u64,
122    signin: u64,
123    download: u64,
124    getdocumentversion: u64,
125    customcommand: u64,
126    listcollections: u64,
127    createcollection: u64,
128    dropcollection: u64,
129    ensurecustomer: u64,
130    invokeopenrpa: u64,
131    registerqueue: u64,
132    registerexchange: u64,
133    unregisterqueue: u64,
134    watch: u64,
135    unwatch: u64,
136    queuemessage: u64,
137    pushworkitem: u64,
138    pushworkitems: u64,
139    popworkitem: u64,
140    updateworkitem: u64,
141    deleteworkitem: u64,
142    addworkitemqueue: u64,
143    updateworkitemqueue: u64,
144    deleteworkitemqueue: u64,
145    getindexes: u64,
146    createindex: u64,
147    dropindex: u64,
148    upload: u64,
149    query: u64,
150    count: u64,
151    distinct: u64,
152    aggregate: u64,
153    insertone: u64,
154    insertmany: u64,
155    insertorupdateone: u64,
156    insertorupdatemany: u64,
157    updateone: u64,
158    updatedocument: u64,
159    deleteone: u64,
160    deletemany: u64,
161}
162// type QueueCallbackFn = Box<dyn Fn(&Client, QueueEvent) -> Option<String> + Send + Sync>;
163use futures::future::BoxFuture;
164type QueueCallbackFn =
165    Arc<dyn Fn(Arc<Client>, QueueEvent) -> BoxFuture<'static, Option<String>> + Send + Sync>;
166
167// type ExchangeCallbackFn = Box<dyn Fn(&Client, QueueEvent) + Send + Sync>;
168/// The `ClientInner` struct provides the inner client for the OpenIAP service.
169#[derive(Clone)]
170pub struct ClientInner {
171    /// list of queries ( messages sent to server we are waiting on a response for )
172    pub queries: Arc<Mutex<std::collections::HashMap<String, QuerySender>>>,
173    /// Active streams the server (or client) has opened
174    pub streams: Arc<Mutex<std::collections::HashMap<String, StreamSender>>>,
175    /// List of active watches ( change streams )
176    #[allow(clippy::type_complexity)]
177    pub watches:
178        Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(WatchEvent) + Send + Sync>>>>,
179    /// List of active queues ( message queues / mqqt queues or exchanges )
180    #[allow(clippy::type_complexity)]
181    pub queues:
182        Arc<Mutex<std::collections::HashMap<String, QueueCallbackFn>>>,
183}
184/// Client enum, used to determine which client to use.
185#[derive(Clone, Debug)]
186pub enum ClientEnum {
187    /// Not set yet
188    None,
189    /// Used when client wants to connect using gRPC 
190    Grpc(FlowServiceClient<tonic::transport::Channel>),
191    /// Used when client wants to connect using websockets
192    WS(Arc<Mutex<Sock>>)
193}
194/// Client event enum, used to determine which event has occurred.
195#[derive(Debug, Clone, PartialEq)]
196pub enum ClientEvent {
197    /// The client has connected
198    Connecting,
199    /// The client has connected
200    Connected,
201    /// The client has disconnected
202    Disconnected(String),
203    /// The client has signed in
204    SignedIn,
205    // The client has signed out
206    // SignedOut,
207    // The client has received a message
208    // Message(Envelope),
209    // The client has received a ping event from the server
210    // Ping,
211    // The client has received a stream
212    // Stream(Vec<u8>),
213    // The client has received a watch event
214    // Watch(WatchEvent),
215    // The client has received a queue event
216    // Queue(QueueEvent),
217}
218/// Client event enum, used to determine which event has occurred.
219#[derive(Debug, Clone, PartialEq)]
220pub enum ClientState {
221    /// The client is disconnected
222    Disconnected,
223    /// The client connecting
224    Connecting,
225    /// The client is connected
226    Connected,
227    /// The client is signed in and connected
228    Signedin
229}
230impl Display for ClientState {
231    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232        match self {
233            ClientState::Disconnected => write!(f, "Disconnected"),
234            ClientState::Connecting => write!(f, "Connecting"),
235            ClientState::Connected => write!(f, "Connected"),
236            ClientState::Signedin => write!(f, "Signedin"),
237        }
238    }
239}
240
241/// The `Config` struct provides the configuration for the OpenIAP service we are connecting to.
242#[derive(Debug, Clone, serde::Deserialize)]
243#[allow(dead_code)]
244pub struct Config {
245    #[serde(default)]
246    wshost: String,
247    #[serde(default)]
248    wsurl: String,
249    #[serde(default)]
250    domain: String,
251    #[serde(default)]
252    auto_create_users: bool,
253    #[serde(default)]
254    namespace: String,
255    #[serde(default)]
256    agent_domain_schema: String,
257    #[serde(default)]
258    version: String,
259    #[serde(default)]
260    validate_emails: bool,
261    #[serde(default)]
262    forgot_pass_emails: bool,
263    #[serde(default)]
264    supports_watch: bool,
265    #[serde(default)]
266    amqp_enabled_exchange: bool,
267    #[serde(default)]
268    multi_tenant: bool,
269    #[serde(default)]
270    enable_entity_restriction: bool,
271    #[serde(default)]
272    enable_web_tours: bool,
273    #[serde(default)]
274    collections_with_text_index: Vec<String>,
275    #[serde(default)]
276    timeseries_collections: Vec<String>,
277    #[serde(default)]
278    ping_clients_interval: i32,
279    #[serde(default)]
280    validlicense: bool,
281    #[serde(default)]
282    forceddomains: Vec<String>,
283    #[serde(default)]
284    grafana_url: String,
285    #[serde(default)]
286    otel_metric_url: String,
287    #[serde(default)]
288    otel_trace_url: String,
289    #[serde(default)]
290    otel_log_url: String,
291    #[serde(default)]
292    enable_analytics: bool,
293}
294impl std::fmt::Debug for ClientInner {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        f.debug_struct("ClientInner")
297            // .field("client", &self.client)
298            .field("queries", &self.queries)
299            .field("streams", &self.streams)
300            .finish()
301    }
302}
303impl Default for Client {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309impl Client {
310    /// Create a new client.
311    pub fn new() -> Self {
312        let (ces, cer) = unbounded::<ClientEvent>();
313        let (out_es, out_er) = unbounded::<Envelope>();
314        Self {
315            task_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
316            stats: Arc::new(std::sync::Mutex::new(ClientStatistics::default())),
317            user: Arc::new(std::sync::Mutex::new(None)),
318            client: Arc::new(std::sync::Mutex::new(ClientEnum::None)),
319            connect_called: Arc::new(std::sync::Mutex::new(false)),
320            runtime: Arc::new(std::sync::Mutex::new(None)),
321            msgcount: Arc::new(std::sync::Mutex::new(-1)),
322            reconnect_ms: Arc::new(std::sync::Mutex::new(1000)),
323            rpc_reply_queue: Arc::new(tokio::sync::Mutex::new(None)),
324            rpc_callback: Arc::new(tokio::sync::Mutex::new(None)),
325            inner: Arc::new(Mutex::new(ClientInner {
326                queries: Arc::new(Mutex::new(std::collections::HashMap::new())),
327                streams: Arc::new(Mutex::new(std::collections::HashMap::new())),
328                watches: Arc::new(Mutex::new(std::collections::HashMap::new())),
329                queues: Arc::new(Mutex::new(std::collections::HashMap::new())),
330            })),
331            config: Arc::new(std::sync::Mutex::new(None)),
332            auto_reconnect: Arc::new(std::sync::Mutex::new(true)),
333            url: Arc::new(std::sync::Mutex::new("".to_string())),
334            username: Arc::new(std::sync::Mutex::new("".to_string())),
335            password: Arc::new(std::sync::Mutex::new("".to_string())),
336            jwt: Arc::new(std::sync::Mutex::new("".to_string())),
337            service_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
338            agent_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
339            agent_version: Arc::new(std::sync::Mutex::new(VERSION.to_string())),
340            event_sender: ces,
341            event_receiver: cer,
342            out_envelope_sender: out_es,
343            out_envelope_receiver: out_er,
344            state: Arc::new(std::sync::Mutex::new(ClientState::Disconnected)),
345            default_timeout: Arc::new(std::sync::Mutex::new(Duration::from_secs(30))),
346        }
347    }
348    /// Connect the client to the OpenIAP server.
349    #[tracing::instrument(skip_all)]
350    pub fn connect(&self, dst: &str) -> Result<(), OpenIAPError> {
351        let rt = match tokio::runtime::Runtime::new() {
352            Ok(rt) => rt,
353            Err(e) => {
354                return Err(OpenIAPError::ClientError(format!(
355                    "Failed to create tokio runtime: {}",
356                    e
357                )));
358            }
359        };
360        self.set_runtime(Some(rt));
361        tokio::task::block_in_place(|| {
362            let handle = self.get_runtime_handle();
363            handle.block_on(self.connect_async(dst))
364        })
365    }
366
367    /// Load the configuration from the server.
368    #[allow(unused_variables)]
369    pub async fn load_config(&self, strurl: &str, url: &url::Url) -> Option<Config> {
370        let config: Option<Config>;
371        let issecure = url.scheme() == "https" || url.scheme() == "wss" || url.port() == Some(443);
372        let mut port = url.port().unwrap_or(80);
373        if issecure {
374            port = 443;
375        }
376        let mut host = url.host_str().unwrap_or("localhost.openiap.io").replace("grpc.", "");
377        if host.starts_with("api-grpc") {
378            host = "api".to_string();
379        }
380        if port == 50051 {
381            port = 3000;
382        }
383        let configurl = if issecure {
384            format!(
385                "{}://{}:{}/config",
386                "https",
387                host,
388                port
389            )
390        } else {
391            format!(
392                "{}://{}:{}/config",
393                "http",
394                host,
395                port
396            )
397        };
398
399        let configurl = url::Url::parse(configurl.as_str())
400            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e))).expect("wefew");
401        trace!("Getting config from: {}", configurl);
402        let o = minreq::get(configurl).with_timeout(5).send();
403        match o {
404            Ok(_) => {
405                let response = match o {
406                    Ok(response) => response,
407                    Err(e) => {
408                        error!("Failed to get config: {}", e);
409                        return None;
410                    }
411                };
412                if response.status_code == 200 {
413                    let body = response.as_str().unwrap();
414                    config = Some(match serde_json::from_str(body) {
415                        Ok(config) => config,
416                        Err(e) => {
417                            error!("Failed to parse config: {}", e);
418                            return None;
419                        }
420                    });
421                } else {
422                    config = None;
423                }
424            }
425            Err(e) => {
426                error!("Failed to get config: {}", e);
427                return None;
428            }
429        }
430        let mut _enable_analytics = true;
431        let mut _otel_metric_url = std::env::var("OTEL_METRIC_URL").unwrap_or_default();
432        let mut _otel_trace_url = std::env::var("OTEL_TRACE_URL").unwrap_or_default();
433        let mut _otel_log_url = std::env::var("OTEL_LOG_URL").unwrap_or_default();
434        let mut apihostname = url.host_str().unwrap_or("localhost.openiap.io").to_string();
435        if apihostname.starts_with("grpc.") {
436            apihostname = apihostname[5..].to_string();
437        }
438    
439        if config.is_some() {
440            let config = config.as_ref().unwrap();
441            if !config.otel_metric_url.is_empty() {
442                _otel_metric_url = config.otel_metric_url.clone();
443            }
444            if !config.otel_trace_url.is_empty() {
445                _otel_trace_url = config.otel_trace_url.clone();
446            }
447            if !config.otel_log_url.is_empty() {
448                _otel_log_url = config.otel_log_url.clone();
449            }
450            if !config.domain.is_empty() {
451                apihostname = config.domain.clone();
452            }
453            _enable_analytics = config.enable_analytics;
454        }
455        #[cfg(feature = "otel")]
456        if _enable_analytics {
457            let service_name = self.get_service_name();
458            let agent_name = self.get_agent_name();
459            let agent_version = self.get_agent_version();
460            match otel::init_telemetry(&service_name, &agent_name, &agent_version, VERSION, &apihostname, _otel_metric_url.as_str(), 
461            _otel_trace_url.as_str(),  _otel_log_url.as_str(), 
462            &self.stats) {
463                Ok(_) => (),
464                Err(e) => {
465                    error!("Failed to initialize telemetry: {}", e);
466                    return None;
467                }
468            }
469        }
470        config
471    }
472
473    /// Connect the client to the OpenIAP server.
474    #[tracing::instrument(skip_all)]
475    pub async fn connect_async(&self, dst: &str) -> Result<(), OpenIAPError> {
476        #[cfg(test)]
477        {   
478            // enable_tracing("openiap=trace", "new");
479            // enable_tracing("openiap=debug", "new");
480            // enable_tracing("trace", "");
481            enable_tracing("openiap=error", "");
482            // enable_tracing("openiap=debug", "");
483        }
484        if self.is_connect_called() {
485            self.set_auto_reconnect(true);
486            return self.reconnect().await;
487        }
488        let mut strurl = dst.to_string();
489        if strurl.is_empty() {
490            strurl = std::env::var("apiurl").unwrap_or("".to_string());
491            if strurl.is_empty() {
492                strurl = std::env::var("grpcapiurl").unwrap_or("".to_string());
493            }
494            if strurl.is_empty() {
495                strurl = std::env::var("wsapiurl").unwrap_or("".to_string());
496            }
497        }
498        if strurl.is_empty() {
499            return Err(OpenIAPError::ClientError("No URL provided".to_string()));
500        }
501        let url = url::Url::parse(strurl.as_str())
502            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
503        let mut username = "".to_string();
504        let mut password = "".to_string();
505        if let Some(p) = url.password() {
506            password = p.to_string();
507        }
508        if !url.username().is_empty() {
509            username = url.username().to_string();
510        }
511        if !username.is_empty() && !password.is_empty() {
512            self.set_username(&username);
513            self.set_password(&password);
514        }
515        let usegprc = url.scheme() == "grpc" || url.domain().unwrap_or("localhost").to_lowercase().starts_with("grpc.") || url.port() == Some(50051);
516        if url.scheme() != "http"
517            && url.scheme() != "https"
518            && url.scheme() != "grpc"
519            && url.scheme() != "ws"
520            && url.scheme() != "wss"
521        {
522            return Err(OpenIAPError::ClientError("Invalid URL scheme".to_string()));
523        }
524        if url.scheme() == "grpc" {
525            if url.port() == Some(443) {
526                strurl = format!("https://{}", url.host_str().unwrap_or("app.openiap.io"));
527            } else { 
528                strurl = format!("http://{}:{}", url.host_str().unwrap_or("app.openiap.io"), url.port().unwrap_or(80));
529            }
530        }
531        let url = url::Url::parse(strurl.as_str())
532            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
533        if url.port().is_none() {
534            strurl = format!(
535                "{}://{}",
536                url.scheme(),
537                url.host_str().unwrap_or("app.openiap.io")
538            );
539        } else {
540            strurl = format!(
541                "{}://{}:{}",
542                url.scheme(),
543                url.host_str().unwrap_or("localhost.openiap.io"),
544                url.port().unwrap_or(80)
545            );
546        }
547        debug!("Connecting to {}", strurl);
548        let config = self.load_config(strurl.as_str(), &url).await;
549        if !usegprc {
550            strurl = format!("{}/ws/v2", strurl);
551
552            let (_stream_tx, stream_rx) = mpsc::channel(60);
553
554            let socket = match tokio_tungstenite::connect_async(strurl.clone()).await {
555                Ok((socket, _)) => socket,
556                Err(e) => {
557                    return Err(OpenIAPError::ClientError(format!(
558                        "Failed to connect to WS: {}",
559                        e
560                    )));
561                }
562            };
563            self.set_client(ClientEnum::WS(Arc::new(Mutex::new(socket))));
564            self.set_connect_called(true);
565            self.set_config(config);
566            self.set_url(&strurl);
567            match self.setup_ws(&strurl).await {
568                Ok(_) => (),
569                Err(e) => {
570                    return Err(OpenIAPError::ClientError(format!(
571                        "Failed to setup WS: {}",
572                        e
573                    )));
574                }
575            }
576            let client2 = self.clone();
577            // tokio::task::Builder::new().name("Old Websocket receiver").spawn(async move {
578            tokio::task::spawn(async move {
579                tokio_stream::wrappers::ReceiverStream::new(stream_rx)
580                    .for_each(|envelope: Envelope| async {
581                        let command = envelope.command.clone();
582                        let rid = envelope.rid.clone();
583                        let id = envelope.id.clone();
584                        trace!("Received command: {}, id: {}, rid: {}", command, id, rid);
585                        client2.parse_incomming_envelope(envelope).await;
586                    })
587                    .await;
588            }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn Old Websocket receiver task: {:?}", e)))?;
589        } else {
590            if url.scheme() == "http" {
591                let response = Client::connect_grpc(strurl.clone()).await;
592                match response {
593                    Ok(client) => {
594                        self.set_client(ClientEnum::Grpc(client));
595                    }
596                    Err(e) => {
597                        return Err(OpenIAPError::ClientError(format!(
598                            "Failed to connect: {}",
599                            e
600                        )));
601                    }
602                }
603            } else {
604                let uri = tonic::transport::Uri::builder()
605                    .scheme(url.scheme())
606                    .authority(url.host().unwrap().to_string())
607                    .path_and_query("/")
608                    .build();
609                let uri = match uri {
610                    Ok(uri) => uri,
611                    Err(e) => {
612                        return Err(OpenIAPError::ClientError(format!(
613                            "Failed to build URI: {}",
614                            e
615                        )));
616                    }
617                };
618                let channel = Channel::builder(uri)
619                    .tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
620                let channel = match channel {
621                    Ok(channel) => channel,
622                    Err(e) => {
623                        return Err(OpenIAPError::ClientError(format!(
624                            "Failed to build channel: {}",
625                            e
626                        )));
627                    }
628                };
629                let channel = channel.connect().await;
630                let channel = match channel {
631                    Ok(channel) => channel,
632                    Err(e) => {
633                        return Err(OpenIAPError::ClientError(format!(
634                            "Failed to connect: {}",
635                            e
636                        )));
637                    }
638                };
639                self.set_client(ClientEnum::Grpc(FlowServiceClient::new(channel)));
640            }
641            self.set_connect_called(true);
642            self.set_config(config);
643            self.set_url(&strurl);
644            self.setup_grpc_stream().await?;
645        };
646        self.post_connected().await
647    }
648
649    /// Connect will initializes a new client and starts a connection to an OpenIAP server.\
650    /// Use "" to autodetect the server from the environment variables (apiurl or grpcapiurl), or provide a URL.\
651    /// \
652    /// You can add username and password, to login using local provider, or set them using OPENIAP_USERNAME and OPENIAP_PASSWORD environment variables.
653    /// It is highly recommended to not user username and password, but instead use a JWT token, set using the OPENIAP_JWT (or jwt) environment variable.
654    /// You can use the openiap vs.code extension to manage this, if you need to generate one your self, login to the OpenIAP server and then open the /jwtlong page.
655    /// If credentials are not provided, the client will run as guest.\
656    /// If credentials are found, it will call [Client::signin] after successfully connecting to the server.
657    /// 
658    /// To troubleshoot issues, call [enable_tracing].
659    /// ```
660    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
661    /// #[tokio::main]
662    /// async fn main() -> Result<(), OpenIAPError> {
663    ///     let client = Client::new_connect("").await?;
664    ///     let q = client.query( QueryRequest::with_projection(
665    ///         "entities",
666    ///         "{}",
667    ///         "{\"name\":1}"
668    ///     )).await?;
669    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
670    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
671    ///     for item in items {
672    ///         println!("Item: {:?}", item);
673    ///     }
674    ///     Ok(())
675    /// }
676    /// ```
677    #[tracing::instrument(skip_all)]
678    pub async fn new_connect(dst: &str) -> Result<Self, OpenIAPError> {
679        #[cfg(test)]
680        {
681            enable_tracing("openiap=error", "");
682        }
683        let client  = Client::new();
684        client.connect_async(dst).await?;
685        Ok(client)
686    }
687    /// Handle auto-signin after a connection has been established.
688    pub async fn post_connected(&self) -> Result<(), OpenIAPError> {
689        if self.get_username().is_empty() && self.get_password().is_empty() {
690            self.set_username(&std::env::var("OPENIAP_USERNAME").unwrap_or_default());
691            self.set_password(&std::env::var("OPENIAP_PASSWORD").unwrap_or_default());
692        }
693        if !self.get_username().is_empty() && !self.get_password().is_empty() {
694            debug!("Signing in with username: {}", self.get_username());
695            let signin = SigninRequest::with_userpass(self.get_username().as_str(), self.get_password().as_str());
696            let loginresponse = self.signin(signin).await;
697            match loginresponse {
698                Ok(response) => {
699                    self.reset_reconnect_ms();
700                    self.set_connected(ClientState::Connected, None);
701                    info!("Signed in as {}", response.user.as_ref().unwrap().username);
702                    Ok(())
703                }
704                Err(_e) => {
705                    self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
706                    Err(_e)
707                }
708            }
709        } else {
710            self.set_jwt(&std::env::var("OPENIAP_JWT").unwrap_or_default());
711            if self.get_jwt().is_empty() {
712                self.set_jwt(&std::env::var("jwt").unwrap_or_default());
713            }
714            if !self.get_jwt().is_empty() {
715                debug!("Signing in with JWT");
716                let signin = SigninRequest::with_jwt(self.get_jwt().as_str());
717                let loginresponse = self.signin(signin).await;
718                match loginresponse {
719                    Ok(response) => match response.user {
720                        Some(user) => {
721                            self.reset_reconnect_ms();
722                            info!("Signed in as {}", user.username);
723                            self.set_connected(ClientState::Connected, None);
724                            Ok(())
725                        }
726                        None => {
727                            self.reset_reconnect_ms();
728                            info!("Signed in as guest");
729                            self.set_connected(ClientState::Connected, None);
730                            Ok(())
731                            // Err(OpenIAPError::ClientError("Signin returned no user object".to_string()))
732                        }
733                    },
734                    Err(_e) => {
735                        self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
736                        Err(_e)
737                    }
738                }
739            } else {
740                self.reset_reconnect_ms();
741                match self.get_element().await {
742                    Ok(_) => {
743                        debug!("Connected, No credentials provided so is running as guest");
744                        self.set_connected(ClientState::Connected, None);
745                        Ok(())
746                    },
747                    Err(e) => {
748                        self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
749                        Err(e)
750                    }
751                }
752            }
753        }
754    }
755    /// Reconnect will attempt to reconnect to the OpenIAP server.
756    #[tracing::instrument(skip_all)]
757    pub async fn reconnect(&self) -> Result<(), OpenIAPError> {
758        let state = self.get_state();
759        if state == ClientState::Connected || state == ClientState::Signedin {
760            return Ok(());
761        }
762        if !self.is_auto_reconnect() {
763            return Ok(());   
764        }
765        let client = self.get_client();
766    
767        match client {
768            ClientEnum::WS(ref _client) => {
769                info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
770                self.setup_ws(&self.get_url()).await?;
771                debug!("Completed reconnecting to websocket");
772                self.post_connected().await
773            }
774            ClientEnum::Grpc(ref _client) => {
775                info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
776                match self.setup_grpc_stream().await {
777                    Ok(_) => {
778                        debug!("Completed reconnecting to gRPC");
779                        self.post_connected().await
780                    },
781                    Err(e) => {
782                        return Err(OpenIAPError::ClientError(format!(
783                            "Failed to setup gRPC stream: {}",
784                            e
785                        )));
786                    }
787                }
788            }
789            ClientEnum::None => {
790                return Err(OpenIAPError::ClientError("Invalid client".to_string()));
791            }
792        }
793    }
794    /// Disconnect the client from the OpenIAP server.
795    pub fn disconnect(&self) {
796        self.set_auto_reconnect(false);
797        self.set_connected(ClientState::Disconnected, Some("Disconnected"));
798    }
799    /// Set the connected flag to true or false
800    pub fn set_connected(&self, state: ClientState, message: Option<&str>) {
801        {
802            let current = self.get_state();
803            trace!("Set connected: {:?} from {:?}", state, current);
804            if state == ClientState::Connected && current == ClientState::Signedin {
805                self.set_state(ClientState::Signedin);
806            } else {
807                self.set_state(state.clone());
808            }
809            if state == ClientState::Disconnected && !current.eq(&state) {
810                let me = self.clone();
811                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
812                    tokio::task::spawn(async move {
813                        let mut reply_queue_guard = me.rpc_reply_queue.lock().await;
814                        let mut callback_guard = me.rpc_callback.lock().await;
815                        *reply_queue_guard = None;
816                        *callback_guard = None;
817                    });
818                }
819            }
820            if state == ClientState::Connecting && !current.eq(&state) {
821                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
822                    self.stats.lock().unwrap().connection_attempts += 1;
823                    let me = self.clone();
824                    tokio::task::spawn(async move {
825                        me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
826                    });
827                }
828
829            }
830            if (state == ClientState::Connected|| state == ClientState::Signedin) && (current == ClientState::Disconnected || current == ClientState::Connecting) { 
831                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
832                    self.stats.lock().unwrap().connections += 1;
833                    let me = self.clone();
834                    tokio::task::spawn(async move {
835                        me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
836                    });
837                }
838            }
839            if state == ClientState::Signedin && current != ClientState::Signedin {
840                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
841                    let me = self.clone();
842                    tokio::task::spawn(async move {
843                        me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
844                    });
845                }
846            }
847            if state == ClientState::Disconnected && !current.eq(&state) {
848                if message.is_some() {
849                    debug!("Disconnected: {}", message.unwrap());
850                } else {
851                    debug!("Disconnected");
852                }
853                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
854                    let me = self.clone();
855                    let message = match message {
856                        Some(message) => message.to_string(),
857                        None => "".to_string(),
858                    };
859                        tokio::task::spawn(async move {
860                            me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
861                        });
862                }
863
864                self.kill_handles();
865                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
866                    let client = self.clone();
867                        tokio::task::spawn(async move {
868                        {
869                            let inner = client.inner.lock().await;
870                            let mut queries = inner.queries.lock().await;
871                            let ids = queries.keys().cloned().collect::<Vec<String>>();
872                            debug!("********************************************** Cleaning up");
873                            for id in ids {
874                                let err = ErrorResponse {
875                                    code: 500,
876                                    message: "Disconnected".to_string(),
877                                    stack: "".to_string(),
878                                };
879                                let envelope = err.to_envelope();
880                                let tx = queries.remove(&id).unwrap();
881                                debug!("kill query: {}", id);
882                                let _ = tx.send(envelope);
883                            }
884                            let mut streams = inner.streams.lock().await;
885                            let ids = streams.keys().cloned().collect::<Vec<String>>();
886                            for id in ids {
887                                let tx = streams.remove(&id).unwrap();
888                                debug!("kill stream: {}", id);
889                                let _ = tx.send(Vec::new()).await;
890                            }
891                            let mut queues = inner.queues.lock().await;
892                            let ids = queues.keys().cloned().collect::<Vec<String>>();
893                            for id in ids {
894                                let _ = queues.remove(&id).unwrap();
895                            }
896                            let mut watches = inner.watches.lock().await;
897                            let ids = watches.keys().cloned().collect::<Vec<String>>();
898                            for id in ids {
899                                let _ = watches.remove(&id).unwrap();
900                            }
901                            debug!("**********************************************************");
902                        }
903                        if client.is_auto_reconnect() {
904                            trace!("Reconnecting in {} seconds", client.get_reconnect_ms() / 1000);
905                            tokio::time::sleep(Duration::from_millis(client.get_reconnect_ms() as u64)).await;
906                            if client.is_auto_reconnect() {
907                                client.inc_reconnect_ms();
908                                // let mut client = client.clone();
909                                trace!("Reconnecting . . .");
910                                client.reconnect().await.unwrap_or_else(|e| {
911                                    error!("Failed to reconnect: {}", e);
912                                    client.set_connected(ClientState::Disconnected, Some(&e.to_string()));
913                                });
914                            } else {
915                                debug!("Not reconnecting");
916                            }
917                        } else {
918                            debug!("Reconnecting disabled, stop now");
919                        }
920                    });
921                }
922    
923            }
924        }
925    }
926    /// Get client state
927    pub fn get_state(&self) -> ClientState {
928        let conn = self.state.lock().unwrap();
929        conn.clone()
930    }
931    /// Set client state
932    pub fn set_state(&self, state: ClientState) {
933        let mut conn = self.state.lock().unwrap();
934        *conn = state;
935    }
936    /// Set the msgcount value
937    pub fn set_msgcount(&self, msgcount: i32) {
938        let mut current = self.msgcount.lock().unwrap();
939        trace!("Set msgcount: {} from {}", msgcount, *current);
940        *current = msgcount;
941    }
942    /// Increment the msgcount value
943    pub fn inc_msgcount(&self) -> i32 {
944        let mut current = self.msgcount.lock().unwrap();
945        *current += 1;
946        *current
947    }
948    /// Return value of reconnect_ms
949    pub fn get_reconnect_ms(&self) -> i32 {
950        let reconnect_ms = self.reconnect_ms.lock().unwrap();
951        *reconnect_ms
952    }
953    /// Increment the reconnect_ms value
954    pub fn reset_reconnect_ms(&self) {
955        let mut current = self.reconnect_ms.lock().unwrap();
956        *current = 500;
957    }
958    /// Increment the reconnect_ms value
959    pub fn inc_reconnect_ms(&self) -> i32 {
960        let mut current = self.reconnect_ms.lock().unwrap();
961        if *current < 30000 {
962            *current += 500;
963        }
964        *current
965    }
966    
967    /// Push tokio task handle to the task_handles vector
968    pub fn push_handle(&self, handle: tokio::task::JoinHandle<()>) {
969        let mut handles = self.task_handles.lock().unwrap();
970        handles.push(handle);
971    }
972    /// Kill all tokio task handles in the task_handles vector
973    pub fn kill_handles(&self) {
974        let mut handles = self.task_handles.lock().unwrap();
975        for handle in handles.iter() {
976            // let id = handle.id();
977            // debug!("Killing handle {}", id);
978            debug!("Killing handle");
979            if !handle.is_finished() {
980                handle.abort();
981            }
982        }
983        handles.clear();
984        // if let Ok(_handle) = tokio::runtime::Handle::try_current() {
985        //     let runtime = self.get_runtime();
986        //     if let Some(rt) = runtime.lock().unwrap().take() {
987        //         rt.shutdown_background();
988        //     }
989        // }
990    }
991
992
993    /// Return value of the msgcount flag
994    #[tracing::instrument(skip_all)]
995    fn get_msgcount(&self) -> i32 {
996        let msgcount = self.msgcount.lock().unwrap();
997        *msgcount
998    }
999    /// Set the default timeout for the client commands
1000    pub fn set_default_timeout(&self, timeout: Duration) {
1001        let mut current = self.default_timeout.lock().unwrap();
1002        trace!("Set default_timeout: {} from {:?}", timeout.as_secs(), current.as_secs());
1003        *current = timeout;
1004    }
1005    /// Return the default timeout for the client commands
1006    pub fn get_default_timeout(&self) -> Duration {
1007        let current = self.default_timeout.lock().unwrap();
1008        current.clone()
1009    }
1010    /// Set the connect_called flag to true or false
1011    #[tracing::instrument(skip_all)]
1012    pub fn set_connect_called(&self, connect_called: bool) {
1013        let mut current = self.connect_called.lock().unwrap();
1014        trace!("Set connect_called: {} from {}", connect_called, *current);
1015        *current = connect_called;
1016    }
1017    /// Return value of the connect_called flag
1018    #[tracing::instrument(skip_all)]
1019    fn is_connect_called(&self) -> bool {
1020        let connect_called = self.connect_called.lock().unwrap();
1021        *connect_called
1022    }
1023    /// Set the auto_reconnect flag to true or false
1024    #[tracing::instrument(skip_all)]
1025    pub fn set_auto_reconnect(&self, auto_reconnect: bool) {
1026        let mut current = self.auto_reconnect.lock().unwrap();
1027        trace!("Set auto_reconnect: {} from {}", auto_reconnect, *current);
1028        *current = auto_reconnect;
1029    }
1030    /// Return value of the auto_reconnect flag
1031    #[tracing::instrument(skip_all)]
1032    fn is_auto_reconnect(&self) -> bool {
1033        let auto_reconnect = self.auto_reconnect.lock().unwrap();
1034        *auto_reconnect
1035    }
1036    /// Set the url flag to true or false
1037    #[tracing::instrument(skip_all)]
1038    pub fn set_url(&self, url: &str) {
1039        let mut current = self.url.lock().unwrap();
1040        trace!("Set url: {} from {}", url, *current);
1041        *current = url.to_string();
1042    }
1043    /// Return value of the url string
1044    #[tracing::instrument(skip_all)]
1045    fn get_url(&self) -> String {
1046        let url = self.url.lock().unwrap();
1047        url.to_string()
1048    }
1049    /// Set the username flag to true or false
1050    #[tracing::instrument(skip_all)]
1051    pub fn set_username(&self, username: &str) {
1052        let mut current = self.username.lock().unwrap();
1053        trace!("Set username: {} from {}", username, *current);
1054        *current = username.to_string();
1055    }
1056    /// Return value of the username string
1057    #[tracing::instrument(skip_all)]
1058    fn get_username(&self) -> String {
1059        let username = self.username.lock().unwrap();
1060        username.to_string()
1061    }
1062    /// Set the password value
1063    #[tracing::instrument(skip_all)]
1064    pub fn set_password(&self, password: &str) {
1065        let mut current = self.password.lock().unwrap();
1066        trace!("Set password: {} from {}", password, *current);
1067        *current = password.to_string();
1068    }
1069    /// Return value of the password string
1070    #[tracing::instrument(skip_all)]
1071    fn get_password(&self) -> String {
1072        let password = self.password.lock().unwrap();
1073        password.to_string()
1074    }
1075    /// Set the jwt flag to true or false
1076    #[tracing::instrument(skip_all)]
1077    pub fn set_jwt(&self, jwt: &str) {
1078        let mut current = self.jwt.lock().unwrap();
1079        trace!("Set jwt: {} from {}", jwt, *current);
1080        *current = jwt.to_string();
1081    }
1082    /// Return value of the jwt string
1083    #[tracing::instrument(skip_all)]
1084    fn get_jwt(&self) -> String {
1085        let jwt = self.jwt.lock().unwrap();
1086        jwt.to_string()
1087    }
1088    
1089    /// Set the service name
1090    #[tracing::instrument(skip_all)]
1091    pub fn set_service_name(&self, service_name: &str) {
1092        let mut current = self.service_name.lock().unwrap();
1093        trace!("Set servicename: {} from {}", service_name, *current);
1094        *current = service_name.to_string();
1095    }
1096    /// Return value of the service name string
1097    #[tracing::instrument(skip_all)]
1098    pub fn get_service_name(&self) -> String {
1099        let servicename = self.service_name.lock().unwrap();
1100        servicename.to_string()
1101    }
1102    /// Set the agent name
1103    #[tracing::instrument(skip_all)]
1104    pub fn set_agent_name(&self, agent: &str) {
1105        let mut current = self.agent_name.lock().unwrap();
1106        trace!("Set agent: {} from {}", agent, *current);
1107        *current = agent.to_string();
1108    }
1109    /// Return value of the agent string
1110    #[tracing::instrument(skip_all)]
1111    pub fn get_agent_name(&self) -> String {
1112        let agent = self.agent_name.lock().unwrap();
1113        agent.to_string()
1114    }
1115    /// Set the agent version number
1116    #[tracing::instrument(skip_all)]
1117    pub fn set_agent_version(&self, version: &str) {
1118        let mut current = self.agent_version.lock().unwrap();
1119        trace!("Set agent version: {} from {}", version, *current);
1120        *current = version.to_string();
1121    }
1122    /// Return value of the agent version string
1123    #[tracing::instrument(skip_all)]
1124    pub fn get_agent_version(&self) -> String {
1125        let agent_version = self.agent_version.lock().unwrap();
1126        agent_version.to_string()
1127    }
1128    
1129    /// Set the config flag to true or false
1130    #[tracing::instrument(skip_all)]
1131    pub fn set_config(&self, config: Option<Config>) {
1132        let mut current = self.config.lock().unwrap();
1133        *current = config;
1134    }
1135    /// Return value of the config 
1136    #[tracing::instrument(skip_all)]
1137    pub fn get_config(&self) -> Option<Config> {
1138        let config = self.config.lock().unwrap();
1139        config.clone()
1140    }
1141    /// Set the client flag to true or false
1142    #[tracing::instrument(skip_all)]
1143    pub fn set_client(&self, client: ClientEnum) {
1144        let mut current = self.client.lock().unwrap();
1145        *current = client;
1146    }
1147    /// Return value of the client
1148    #[tracing::instrument(skip_all)]
1149    fn get_client(&self) -> ClientEnum {
1150        let client = self.client.lock().unwrap();
1151        client.clone()
1152    }
1153    /// Set the user flag to true or false
1154    #[tracing::instrument(skip_all)]
1155    pub fn set_user(&self, user: Option<User>) {
1156        let mut current = self.user.lock().unwrap();
1157        *current = user;
1158    }
1159    /// Return value of the user
1160    #[tracing::instrument(skip_all)]
1161    pub fn get_user(&self) -> Option<User> {
1162        let user = self.user.lock().unwrap();
1163        user.clone()
1164    }
1165    // /// Return the signed in user, if we are signed in.
1166    // #[tracing::instrument(skip_all)]
1167    // pub fn get_user(&self) -> Option<User> {
1168    //     // let inner = self.inner.lock().await;
1169    //     // inner.user.clone()
1170    //     self.user.clone()
1171    // }
1172    
1173    /// Set the runtime flag to true or false
1174    #[tracing::instrument(skip_all)]
1175    pub fn set_runtime(&self, runtime: Option<tokio::runtime::Runtime>) {
1176        let mut current = self.runtime.lock().unwrap();
1177        *current = runtime;
1178    }
1179    /// Return value of the runtime
1180    #[tracing::instrument(skip_all)]
1181    // pub fn get_runtime(&self) -> Option<Arc<tokio::runtime::Runtime>> {
1182    pub fn get_runtime(&self) -> &std::sync::Mutex<std::option::Option<tokio::runtime::Runtime>> {
1183        self.runtime.as_ref()
1184    }
1185    /// Return value of the runtime handle
1186    #[tracing::instrument(skip_all)]
1187    pub fn get_runtime_handle(&self) -> tokio::runtime::Handle {
1188        let mut rt = self.runtime.lock().unwrap();
1189        if rt.is_none() {
1190            // println!("Rust: Initializing new Tokio runtime");
1191            let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
1192            *rt = Some(runtime);
1193        } else {
1194            // println!("Rust: Runtime already initialized");
1195        }
1196        rt.as_ref().unwrap().handle().clone()
1197    }
1198    /// Method to allow the user to subscribe with a callback function
1199    #[tracing::instrument(skip_all)]
1200    pub async fn on_event(&self, callback: Box<dyn Fn(ClientEvent) + Send + Sync>)
1201    {
1202        // call the callback function every time there is an event in the client.event_receiver
1203        let event_receiver = self.event_receiver.clone();
1204        let callback = callback;
1205        let _handle =  tokio::task::spawn(async move {
1206            while let Ok(event) = event_receiver.recv().await {
1207                callback(event);
1208            }
1209        }); // .unwrap();
1210    }
1211    /// Internal function, used to generate a unique id for each message sent to the server.
1212    #[tracing::instrument(skip_all)]
1213    pub fn get_uniqueid() -> String {
1214        static COUNTER: AtomicUsize = AtomicUsize::new(1);
1215        let num1 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1216        let num2 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1217        let num3 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1218        let sqids = Sqids::default();
1219        sqids.encode(&[num1, num2, num3 ]).unwrap().to_string()
1220    }
1221    /// Internal function, Send a message to the OpenIAP server, and wait for a response.
1222    #[tracing::instrument(skip_all)]
1223    async fn send(&self, msg: Envelope, timeout: Option<tokio::time::Duration>) -> Result<Envelope, OpenIAPError> {
1224        let response = self.send_noawait(msg).await;
1225        match response {
1226            Ok((response_rx, id)) => {
1227                let timeout = match timeout {
1228                    Some(t) => t,
1229                    None => self.get_default_timeout()
1230                };
1231                let result = tokio::time::timeout(timeout, response_rx).await;
1232                // Remove the entry from `inner.queries` after awaiting
1233                let inner = self.inner.lock().await;
1234                inner.queries.lock().await.remove(&id);
1235
1236                match result {
1237                    Ok(Ok(response)) => Ok(response),
1238                    Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
1239                    Err(_) => Err(OpenIAPError::ClientError("Request timed out".to_string())),
1240                }
1241                // // Await the response
1242                // let response = response_rx.await;
1243    
1244                // // Remove the entry from `inner.queries` after awaiting
1245                // let inner = self.inner.lock().await;
1246                // inner.queries.lock().await.remove(&id);
1247    
1248                // // Handle the result of the await
1249                // match response {
1250                //     Ok(response) => Ok(response),
1251                //     Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1252                // }
1253            }
1254            Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1255        }
1256    }
1257    /// Internal function, Send a message to the OpenIAP server, and do not wait for a response.
1258    /// used when sending a stream of data, or when we do not need a response.
1259    #[tracing::instrument(skip_all)]
1260    async fn send_noawait(
1261        &self,
1262        mut msg: Envelope,
1263    ) -> Result<(oneshot::Receiver<Envelope>, String), OpenIAPError> {
1264        let (response_tx, response_rx) = oneshot::channel();
1265        let id = Client::get_uniqueid();
1266        msg.id = id.clone();
1267    
1268        // Lock and insert the sender into `inner.queries`
1269        {
1270            let inner = self.inner.lock().await;
1271            inner.queries.lock().await.insert(id.clone(), response_tx);
1272        }
1273    
1274        // Send the message and check for errors
1275        let res = self.send_envelope(msg).await;
1276        if let Err(e) = res {
1277            // Remove the entry from `inner.queries` if the send fails
1278            let inner = self.inner.lock().await;
1279            inner.queries.lock().await.remove(&id);
1280            return Err(OpenIAPError::ClientError(e.to_string()));
1281        }
1282    
1283        Ok((response_rx, id))
1284    }
1285    /// Internal function, Setup a new stream, send a message to the OpenIAP server, and return a stream to send and receive data.
1286    #[tracing::instrument(skip_all)]
1287    async fn sendwithstream(
1288        &self,
1289        mut msg: Envelope,
1290    ) -> Result<(oneshot::Receiver<Envelope>, mpsc::Receiver<Vec<u8>>), OpenIAPError> {
1291        let (response_tx, response_rx) = oneshot::channel();
1292        let (stream_tx, stream_rx) = mpsc::channel(1024 * 1024);
1293        let id = Client::get_uniqueid();
1294        msg.id = id.clone();
1295        {
1296            let inner = self.inner.lock().await;
1297            inner.queries.lock().await.insert(id.clone(), response_tx);
1298            inner.streams.lock().await.insert(id.clone(), stream_tx);
1299            let res = self.send_envelope(msg).await;
1300            match res {
1301                Ok(_) => (),
1302                Err(e) => return Err(OpenIAPError::ClientError(e.to_string())),
1303            }
1304        }
1305        Ok((response_rx, stream_rx))
1306    }
1307    #[tracing::instrument(skip_all, target = "openiap::client")]
1308    async fn send_envelope(&self, mut envelope: Envelope) -> Result<(), OpenIAPError> {
1309        if (self.get_state() != ClientState::Connected && self.get_state() != ClientState::Signedin ) 
1310            && envelope.command != "signin" && envelope.command != "getelement" && envelope.command != "pong" {
1311            return Err(OpenIAPError::ClientError(format!("Not connected ( {:?} )", self.get_state())));
1312        }
1313        let env = envelope.clone();
1314        let command = envelope.command.clone();
1315        self.stats.lock().unwrap().package_tx += 1;
1316        match command.as_str() {
1317            "signin" => { self.stats.lock().unwrap().signin += 1;},
1318            "upload" => { self.stats.lock().unwrap().upload += 1;},
1319            "download" => { self.stats.lock().unwrap().download += 1;},
1320            "getdocumentversion" => { self.stats.lock().unwrap().getdocumentversion += 1;},
1321            "customcommand" => { self.stats.lock().unwrap().customcommand += 1;},
1322            "listcollections" => { self.stats.lock().unwrap().listcollections += 1;},
1323            "createcollection" => { self.stats.lock().unwrap().createcollection += 1;},
1324            "dropcollection" => { self.stats.lock().unwrap().dropcollection += 1;},
1325            "ensurecustomer" => { self.stats.lock().unwrap().ensurecustomer += 1;},
1326            "invokeopenrpa" => { self.stats.lock().unwrap().invokeopenrpa += 1;},
1327
1328            "registerqueue" => { self.stats.lock().unwrap().registerqueue += 1;},
1329            "registerexchange" => { self.stats.lock().unwrap().registerexchange += 1;},
1330            "unregisterqueue" => { self.stats.lock().unwrap().unregisterqueue += 1;},
1331            "watch" => { self.stats.lock().unwrap().watch += 1;},
1332            "unwatch" => { self.stats.lock().unwrap().unwatch += 1;},
1333            "queuemessage" => { self.stats.lock().unwrap().queuemessage += 1;},
1334
1335            "pushworkitem" => { self.stats.lock().unwrap().pushworkitem += 1;},
1336            "pushworkitems" => { self.stats.lock().unwrap().pushworkitems += 1;},
1337            "popworkitem" => { self.stats.lock().unwrap().popworkitem += 1;},
1338            "updateworkitem" => { self.stats.lock().unwrap().updateworkitem += 1;},
1339            "deleteworkitem" => { self.stats.lock().unwrap().deleteworkitem += 1;},
1340            "addworkitemqueue" => { self.stats.lock().unwrap().addworkitemqueue += 1;},
1341            "updateworkitemqueue" => { self.stats.lock().unwrap().updateworkitemqueue += 1;},
1342            "deleteworkitemqueue" => { self.stats.lock().unwrap().deleteworkitemqueue += 1;},
1343
1344            "getindexes" => { self.stats.lock().unwrap().getindexes += 1;},
1345            "createindex" => { self.stats.lock().unwrap().createindex += 1;},
1346            "dropindex" => { self.stats.lock().unwrap().dropindex += 1;},
1347            "query" => { self.stats.lock().unwrap().query += 1;},
1348            "count" => { self.stats.lock().unwrap().count += 1;},
1349            "distinct" => { self.stats.lock().unwrap().distinct += 1;},
1350            "aggregate" => { self.stats.lock().unwrap().aggregate += 1;},
1351            "insertone" => { self.stats.lock().unwrap().insertone += 1;},
1352            "insertmany" => { self.stats.lock().unwrap().insertmany += 1;},
1353            "updateone" => { self.stats.lock().unwrap().updateone += 1;},
1354            "insertorupdateone" => { self.stats.lock().unwrap().insertorupdateone += 1;},
1355            "insertorupdatemany" => { self.stats.lock().unwrap().insertorupdatemany += 1;},
1356            "updatedocument" => { self.stats.lock().unwrap().updatedocument += 1;},
1357            "deleteone" => { self.stats.lock().unwrap().deleteone += 1;},
1358            "deletemany" => { self.stats.lock().unwrap().deletemany += 1;},
1359            _ => {}
1360        };
1361        if envelope.id.is_empty() {
1362            let id = Client::get_uniqueid();
1363            envelope.id = id.clone();
1364        }
1365        trace!("Sending {} message, in the thread", command);
1366        let res = self.out_envelope_sender.send(env).await;
1367        if res.is_err() {
1368            error!("{:?}", res);
1369            let errmsg = res.unwrap_err().to_string();
1370            self.set_connected(ClientState::Disconnected, Some(&errmsg));
1371            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", errmsg)))
1372        } else {
1373            return Ok(())
1374        }
1375    }
1376    #[tracing::instrument(skip_all, target = "openiap::client")]
1377    async fn parse_incomming_envelope(&self, received: Envelope) {
1378        self.stats.lock().unwrap().package_rx += 1;
1379        let command = received.command.clone();
1380        trace!("parse_incomming_envelope, command: {}", command);
1381        let inner = self.inner.lock().await;
1382        let rid = received.rid.clone();
1383        let mut queries = inner.queries.lock().await;
1384        let mut streams = inner.streams.lock().await;
1385        let watches = inner.watches.lock().await;
1386        let queues = inner.queues.lock().await;
1387    
1388        if command != "ping" && command != "pong" && command != "refreshtoken" {
1389            if rid.is_empty() {
1390                debug!("Received #{} #{} {} message", received.seq, received.id, command);
1391            } else {
1392                debug!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1393            }
1394        } else if rid.is_empty() {
1395            trace!("Received #{} #{} {} message", received.seq, received.id, command);
1396        } else {
1397            trace!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1398        }
1399        
1400        if command == "ping" {
1401            self.pong(&received.id).await;
1402            // self.event_sender.send(crate::ClientEvent::Ping).await.unwrap();
1403        } else if command == "refreshtoken" {
1404            // TODO: Do we store the new jwt at some point in the future
1405        } else if command == "beginstream"
1406            || command == "stream"
1407            || command == "endstream"
1408        {
1409            let streamresponse: Stream =
1410                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1411            let streamdata = streamresponse.data;
1412            if !streamdata.is_empty() {
1413                let stream = streams.get(rid.as_str()).unwrap();
1414    
1415                match stream.send(streamdata).await {
1416                    Ok(_) => _ = (),
1417                    Err(e) => error!("Failed to send data: {}", e),
1418                }
1419            }
1420            if command == "endstream" {
1421                let _ = streams.remove(rid.as_str());
1422            }
1423        } else if command == "watchevent" {
1424            let watchevent: WatchEvent =
1425                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1426            if let Some(callback) = watches.get(watchevent.id.as_str()) {
1427                callback(watchevent);
1428            }
1429        } else if command == "queueevent" {
1430            let queueevent: QueueEvent =
1431                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1432            if let Some(callback) = queues.get(queueevent.queuename.as_str()).cloned() {
1433                let queuename = queueevent.replyto.clone();
1434                let correlation_id = queueevent.correlation_id.clone();
1435                let me = self.clone();
1436                tokio::spawn(async move {
1437                    let result_fut = callback(Arc::new(me.clone()), queueevent);
1438                    let result = result_fut.await;
1439                    if result.is_some() && !queuename.is_empty() {
1440                        debug!("Sending return value from queue event callback to {}", queuename);
1441                        let result = result.unwrap();
1442                        let q = QueueMessageRequest {
1443                            queuename,
1444                            correlation_id,
1445                            data: result,
1446                            striptoken: true,
1447                            ..Default::default()
1448                        };
1449                        let e = q.to_envelope();
1450                        let send_result = me.send(e, None).await;
1451                        if let Err(e) = send_result {
1452                            error!("Failed to send queue event response: {}", e);
1453                        }
1454                    }
1455                });
1456            }
1457        } else if let Some(response_tx) = queries.remove(&rid) {
1458            let stream = streams.get(rid.as_str());
1459            if let Some(stream) = stream {
1460                let streamdata = vec![];
1461                match stream.send(streamdata).await {
1462                    Ok(_) => _ = (),
1463                    Err(e) => error!("Failed to send data: {}", e),
1464                }
1465            }
1466            let _ = response_tx.send(received);
1467        } else {
1468            error!("Received unhandled {} message: {:?}", command, received);
1469        }    
1470    }    
1471    /// Internal function, used to send a fake getelement to the OpenIAP server.
1472    #[tracing::instrument(skip_all)]
1473    async fn get_element(&self) -> Result<(), OpenIAPError> {
1474        let id = Client::get_uniqueid();
1475        let envelope = Envelope {
1476            id: id.clone(),
1477            command: "getelement".into(),
1478            ..Default::default()
1479        };
1480        let result = match self.send(envelope, None).await {
1481            Ok(res) => res,
1482            Err(e) => {
1483                return Err(e);
1484            },
1485        };
1486        if result.command == "pong" || result.command == "getelement" {
1487            Ok(())
1488        } else if result.command == "error" {
1489            let e: ErrorResponse = prost::Message::decode(result.data.unwrap().value.as_ref()).unwrap();
1490            Err(OpenIAPError::ServerError(e.message))
1491        } else {
1492            Err(OpenIAPError::ClientError("Failed to receive getelement".to_string()))
1493        }
1494    }
1495    /// Internal function, used to send a ping to the OpenIAP server.
1496    #[tracing::instrument(skip_all)]
1497    async fn ping(&self) -> Result<(), OpenIAPError> {
1498        let id = Client::get_uniqueid();
1499        let envelope = Envelope {
1500            id: id.clone(),
1501            command: "getelement".into(),
1502            ..Default::default()
1503        };
1504        match self.send_envelope(envelope).await {
1505            Ok(_res) => Ok(()),
1506            Err(e) => {
1507                return Err(e);
1508            },
1509        }
1510    }
1511    /// Internal function, used to send a pong response to the OpenIAP server.
1512    #[tracing::instrument(skip_all)]
1513    async fn pong(&self, rid: &str) {
1514        let id = Client::get_uniqueid();
1515        let envelope = Envelope {
1516            id: id.clone(),
1517            command: "pong".into(),
1518            rid: rid.to_string(),
1519            ..Default::default()
1520        };
1521        match self.send_envelope(envelope).await {
1522            Ok(_) => (),
1523            Err(e) => error!("Failed to send pong: {}", e),
1524        }
1525    }
1526    /// Sign in to the OpenIAP service. \
1527    /// If no username and password is provided, it will attempt to use environment variables.\
1528    /// if config is set to validateonly, it will only validate the credentials, but not sign in.\
1529    /// If no jwt, username and password is provided, it will attempt to use environment variables.\
1530    /// will prefere OPENIAP_JWT (or jwt) over OPENIAP_USERNAME and OPENIAP_PASSWORD.
1531    #[tracing::instrument(skip_all)]
1532    pub async fn signin(&self, mut config: SigninRequest) -> Result<SigninResponse, OpenIAPError> {
1533        // autodetect how to signin using environment variables
1534        if config.username.is_empty() && config.password.is_empty() && config.jwt.is_empty() {
1535            if config.jwt.is_empty() {
1536                config.jwt = std::env::var("OPENIAP_JWT").unwrap_or_default();
1537            }
1538            if config.jwt.is_empty() {
1539                config.jwt = std::env::var("jwt").unwrap_or_default();
1540            }
1541            // if no jwt was found, test for username and password
1542            if config.jwt.is_empty() {
1543                if config.username.is_empty() {
1544                    config.username = std::env::var("OPENIAP_USERNAME").unwrap_or_default();
1545                }
1546                if config.password.is_empty() {
1547                    config.password = std::env::var("OPENIAP_PASSWORD").unwrap_or_default();
1548                }
1549            }
1550        }
1551        let version = env!("CARGO_PKG_VERSION");
1552        if !version.is_empty() && config.version.is_empty() {
1553            config.version = version.to_string();
1554        }
1555        if config.agent.is_empty() {
1556            config.agent = self.get_agent_name();
1557        }
1558
1559        // trace!("Attempting sign-in using {:?}", config);
1560        let envelope = config.to_envelope();
1561        let result = self.send(envelope, None).await;
1562
1563        match &result {
1564            Ok(m) => {
1565                debug!("Sign-in reply received");
1566                if m.command == "error" {
1567                    let e: ErrorResponse =
1568                        prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1569                            .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1570                    return Err(OpenIAPError::ServerError(e.message));
1571                }
1572                debug!("Sign-in successful");
1573                let response: SigninResponse =
1574                    prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1575                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1576                if !config.validateonly {
1577                    self.set_connected(ClientState::Signedin, None);
1578                    self.set_user(Some(response.user.as_ref().unwrap().clone()));
1579                }
1580                Ok(response)
1581            }
1582            Err(e) => {
1583                debug!("Sending Sign-in request failed {:?}", result);
1584                debug!("Sign-in failed: {}", e.to_string());
1585                if !config.validateonly {
1586                    self.set_user(None);
1587                }
1588                Err(OpenIAPError::ClientError(e.to_string()))
1589            }
1590        }
1591    }
1592    /// Return a list of collections in the database
1593    /// - includehist: include historical collections, default is false.
1594    /// please see create_collection for examples on how to create collections.
1595    #[tracing::instrument(skip_all)]
1596    pub async fn list_collections(&self, includehist: bool) -> Result<String, OpenIAPError> {
1597        let config = ListCollectionsRequest::new(includehist);
1598        let envelope = config.to_envelope();
1599        let result = self.send(envelope, None).await;
1600        match result {
1601            Ok(m) => {
1602                let data = match m.data {
1603                    Some(data) => data,
1604                    None => {
1605                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1606                    }
1607                };
1608                if m.command == "error" {
1609                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1610                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1611                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1612                }
1613                let response: ListCollectionsResponse = prost::Message::decode(data.value.as_ref())
1614                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1615                Ok(response.results)
1616            }
1617            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1618        }
1619    }
1620    /// Create a new collection in the database.
1621    /// You can create a collection by simply adding a new document to it using [Client::insert_one].
1622    /// Or you can create a collecting using the following example:
1623    /// ```
1624    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1625    /// #[tokio::main]
1626    /// async fn main() -> Result<(), OpenIAPError> {
1627    ///     let client = Client::new_connect("").await?;
1628    ///     //let collections = client.list_collections(false).await?;
1629    ///     //println!("Collections: {}", collections);
1630    ///     let config = CreateCollectionRequest::byname("rusttestcollection");
1631    ///     client.create_collection(config).await?;
1632    ///     let config = DropCollectionRequest::byname("rusttestcollection");
1633    ///     client.drop_collection(config).await?;
1634    ///     Ok(())
1635    /// }
1636    /// ```
1637    /// You can create a normal collection with a TTL index on the _created field, using the following example:
1638    /// ```
1639    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1640    /// #[tokio::main]
1641    /// async fn main() -> Result<(), OpenIAPError> {
1642    ///     let client = Client::new_connect("").await?;
1643    ///     let config = CreateCollectionRequest::with_ttl(
1644    ///         "rusttestttlcollection",
1645    ///         60
1646    ///     );
1647    ///     client.create_collection(config).await?;
1648    ///     let config = DropCollectionRequest::byname("rusttestttlcollection");
1649    ///     client.drop_collection(config).await?;
1650    ///     Ok(())
1651    /// }
1652    /// ```
1653    /// You can create a time series collection using the following example:
1654    /// granularity can be one of: seconds, minutes, hours
1655    /// ```
1656    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1657    /// #[tokio::main]
1658    /// async fn main() -> Result<(), OpenIAPError> {
1659    ///     let client = Client::new_connect("").await?;
1660    ///     let config = CreateCollectionRequest::timeseries(
1661    ///         "rusttesttscollection2",
1662    ///         "_created",
1663    ///         "minutes"
1664    ///     );
1665    ///     client.create_collection(config).await?;
1666    ///     let config = DropCollectionRequest::byname("rusttesttscollection2");
1667    ///     client.drop_collection(config).await?;
1668    ///     Ok(())
1669    /// }
1670    /// ```
1671    #[tracing::instrument(skip_all)]
1672    pub async fn create_collection(
1673        &self,
1674        config: CreateCollectionRequest,
1675    ) -> Result<(), OpenIAPError> {
1676        if config.collectionname.is_empty() {
1677            return Err(OpenIAPError::ClientError(
1678                "No collection name provided".to_string(),
1679            ));
1680        }
1681        let envelope = config.to_envelope();
1682        let result = self.send(envelope, None).await;
1683        match result {
1684            Ok(m) => {
1685                let data = match m.data {
1686                    Some(data) => data,
1687                    None => {
1688                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1689                    }
1690                };
1691                if m.command == "error" {
1692                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1693                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1694                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1695                }
1696                Ok(())
1697            }
1698            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1699        }
1700    }
1701    /// Drop a collection from the database, this will delete all data and indexes for the collection.
1702    /// See [Client::create_collection] for examples on how to create a collection.
1703    #[tracing::instrument(skip_all)]
1704    pub async fn drop_collection(&self, config: DropCollectionRequest) -> Result<(), OpenIAPError> {
1705        if config.collectionname.is_empty() {
1706            return Err(OpenIAPError::ClientError(
1707                "No collection name provided".to_string(),
1708            ));
1709        }
1710        let envelope = config.to_envelope();
1711        let result = self.send(envelope, None).await;
1712        match result {
1713            Ok(m) => {
1714                let data = match m.data {
1715                    Some(data) => data,
1716                    None => {
1717                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1718                    }
1719                };
1720                if m.command == "error" {
1721                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1722                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1723                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1724                }
1725                Ok(())
1726            }
1727            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1728        }
1729    }
1730    /// Return all indexes for a collection in the database
1731    /// ```
1732    /// use openiap_client::{Client, GetIndexesRequest, OpenIAPError};
1733    /// #[tokio::main]
1734    /// async fn main() -> Result<(), OpenIAPError> {
1735    ///     let client = Client::new_connect("").await?;
1736    ///     let config = GetIndexesRequest::bycollectionname("rustindextestcollection");
1737    ///     let indexes = client.get_indexes(config).await?;
1738    ///     println!("Indexes: {}", indexes);
1739    ///     Ok(())
1740    /// }
1741    /// ```
1742    /// 
1743    pub async fn get_indexes(&self, config: GetIndexesRequest) -> Result<String, OpenIAPError> {
1744        if config.collectionname.is_empty() {
1745            return Err(OpenIAPError::ClientError(
1746                "No collection name provided".to_string(),
1747            ));
1748        }
1749        let envelope = config.to_envelope();
1750        let result = self.send(envelope, None).await;
1751        match result {
1752            Ok(m) => {
1753                let data = match m.data {
1754                    Some(data) => data,
1755                    None => {
1756                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1757                    }
1758                };
1759                if m.command == "error" {
1760                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1761                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1762                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1763                }
1764                let response: GetIndexesResponse = prost::Message::decode(data.value.as_ref())
1765                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1766                Ok(response.results)
1767            }
1768            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1769        }
1770    }
1771    /// Create an index in the database.
1772    /// Example of creating an index on the name field in the rustindextestcollection collection, and then dropping it again:
1773    /// ```
1774    /// use openiap_client::{Client, DropIndexRequest, CreateIndexRequest, OpenIAPError};
1775    /// #[tokio::main]
1776    /// async fn main() -> Result<(), OpenIAPError> {
1777    ///     let client = Client::new_connect("").await?;
1778    ///     let config = CreateIndexRequest::bycollectionname(
1779    ///         "rustindextestcollection",
1780    ///         "{\"name\": 1}"
1781    ///     );
1782    ///     client.create_index(config).await?;
1783    ///     let config = DropIndexRequest::bycollectionname(
1784    ///         "rustindextestcollection",
1785    ///         "name_1"
1786    ///     );
1787    ///     client.drop_index(config).await?;
1788    ///     Ok(())
1789    /// }
1790    /// ```
1791    /// Example of creating an unique index on the address field in the rustindextestcollection collection, and then dropping it again:
1792    /// ```
1793    /// use openiap_client::{Client, DropIndexRequest, CreateIndexRequest, OpenIAPError};
1794    /// #[tokio::main]
1795    /// async fn main() -> Result<(), OpenIAPError> {
1796    ///     let client = Client::new_connect("").await?;
1797    ///     let mut config = CreateIndexRequest::bycollectionname(
1798    ///         "rustindextestcollection",
1799    ///         "{\"address\": 1}"
1800    ///     );
1801    ///     config.options = "{\"unique\": true}".to_string();
1802    ///     client.create_index(config).await?;
1803    ///     let config = DropIndexRequest::bycollectionname(
1804    ///         "rustindextestcollection",
1805    ///         "address_1"
1806    ///     );
1807    ///     client.drop_index(config).await?;
1808    ///     Ok(())
1809    /// }
1810    /// ```
1811    pub async fn create_index(&self, config: CreateIndexRequest) -> Result<(), OpenIAPError> {
1812        if config.collectionname.is_empty() {
1813            return Err(OpenIAPError::ClientError(
1814                "No collection name provided".to_string(),
1815            ));
1816        }
1817        if config.index.is_empty() {
1818            return Err(OpenIAPError::ClientError(
1819                "No index was provided".to_string(),
1820            ));
1821        }
1822        let envelope = config.to_envelope();
1823        let result = self.send(envelope, None).await;
1824        match result {
1825            Ok(m) => {
1826                let data = match m.data {
1827                    Some(data) => data,
1828                    None => {
1829                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1830                    }
1831                };
1832                if m.command == "error" {
1833                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1834                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1835                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1836                }
1837                Ok(())
1838            }
1839            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1840        }
1841    }
1842    /// Drop an index from the database
1843    /// See [Client::create_index] for an example on how to create and drop an index.
1844    pub async fn drop_index(&self, config: DropIndexRequest) -> Result<(), OpenIAPError> {
1845        if config.collectionname.is_empty() {
1846            return Err(OpenIAPError::ClientError(
1847                "No collection name provided".to_string(),
1848            ));
1849        }
1850        if config.name.is_empty() {
1851            return Err(OpenIAPError::ClientError(
1852                "No index name provided".to_string(),
1853            ));
1854        }
1855        let envelope = config.to_envelope();
1856        let result = self.send(envelope, None).await;
1857        match result {
1858            Ok(m) => {
1859                let data = match m.data {
1860                    Some(data) => data,
1861                    None => {
1862                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1863                    }
1864                };
1865                if m.command == "error" {
1866                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1867                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1868                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1869                }
1870                Ok(())
1871            }
1872            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1873        }
1874    }
1875    /// To query all documents in the entities collection where _type is test, you can use the following example:
1876    /// ```
1877    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1878    /// #[tokio::main]
1879    /// async fn main() -> Result<(), OpenIAPError> {
1880    ///     let client = Client::new_connect("").await?;
1881    ///     let q = client.query( QueryRequest::with_query(
1882    ///         "entities",
1883    ///         "{\"_type\":\"test\"}"
1884    ///     )).await?;
1885    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
1886    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1887    ///     for item in items {
1888    ///         println!("Item: {:?}", item);
1889    ///     }
1890    ///     Ok(())
1891    /// }
1892    /// ```
1893    /// To query all documents in the entities collection, and only return the name and _id field for all documents, you can use the following example:
1894    /// ```
1895    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1896    /// #[tokio::main]
1897    /// async fn main() -> Result<(), OpenIAPError> {
1898    ///     let client = Client::new_connect("").await?;
1899    ///     let q = client.query( QueryRequest::with_projection(
1900    ///         "entities",
1901    ///         "{}",
1902    ///         "{\"name\":1}"
1903    ///     )).await?;
1904    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
1905    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1906    ///     for item in items {
1907    ///         println!("Item: {:?}", item);
1908    ///     }
1909    ///     Ok(())
1910    /// }
1911    /// ```
1912    #[tracing::instrument(skip_all)]
1913    pub async fn query(&self, mut config: QueryRequest) -> Result<QueryResponse, OpenIAPError> {
1914        if config.collectionname.is_empty() {
1915            config.collectionname = "entities".to_string();
1916        }
1917
1918        let envelope = config.to_envelope();
1919        let result = self.send(envelope, None).await;
1920        match result {
1921            Ok(m) => {
1922                let data = match m.data {
1923                    Some(data) => data,
1924                    None => {
1925                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1926                    }
1927                };
1928                if m.command == "error" {
1929                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1930                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1931                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1932                }
1933                let response: QueryResponse = prost::Message::decode(data.value.as_ref())
1934                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1935                debug!("Return Ok(response)");
1936                Ok(response)
1937            }
1938            Err(e) => {
1939                debug!("Error !!");
1940                Err(OpenIAPError::ClientError(e.to_string()))
1941            }
1942        }
1943    }
1944    /// Try and get a single document from the database.\
1945    /// If no document is found, it will return None.
1946    /// ```
1947    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1948    /// #[tokio::main]
1949    /// async fn main() -> Result<(), OpenIAPError> {
1950    ///     let client = Client::new_connect("").await?;
1951    ///     let config = QueryRequest::with_query(
1952    ///         "users",
1953    ///         "{\"username\":\"guest\"}"
1954    ///     );
1955    ///     let item = client.get_one(config).await;
1956    ///     match item {
1957    ///         Some(item) => {
1958    ///             assert_eq!(item["username"], "guest");
1959    ///             println!("Item: {:?}", item);
1960    ///         }
1961    ///         None => {
1962    ///             println!("No item found");
1963    ///             assert!(false, "No item found");
1964    ///         }
1965    ///     }
1966    ///     Ok(())
1967    /// }
1968    /// ```
1969    #[tracing::instrument(skip_all)]
1970    pub async fn get_one(&self, mut config: QueryRequest) -> Option<serde_json::Value> {
1971        if config.collectionname.is_empty() {
1972            config.collectionname = "entities".to_string();
1973        }
1974        config.top = 1;
1975        let envelope = config.to_envelope();
1976        let result = self.send(envelope, None).await;
1977        match result {
1978            Ok(m) => {
1979                let data = match m.data {
1980                    Some(data) => data,
1981                    None => return None,
1982                };
1983                if m.command == "error" {
1984                    return None;
1985                }
1986                let response: QueryResponse = prost::Message::decode(data.value.as_ref()).ok()?;
1987
1988                let items: serde_json::Value = serde_json::from_str(&response.results).unwrap();
1989                let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1990                if items.is_empty() {
1991                    return None;
1992                }
1993                let item = items[0].clone();
1994                Some(item)
1995            }
1996            Err(_) => None,
1997        }
1998    }
1999
2000    /// Try and get a specefic version of a document from the database, reconstructing it from the history collection
2001    /// ```
2002    /// use openiap_client::{OpenIAPError, Client, GetDocumentVersionRequest, InsertOneRequest, UpdateOneRequest};
2003    /// #[tokio::main]
2004    /// async fn main() -> Result<(), OpenIAPError> {
2005    ///     let client = Client::new_connect("").await?;
2006    ///     let item = "{\"name\": \"test from rust\", \"_type\": \"test\"}";
2007    ///     let query = InsertOneRequest {
2008    ///         collectionname: "entities".to_string(),
2009    ///         item: item.to_string(),
2010    ///         j: true,
2011    ///         w: 2,
2012    ///         ..Default::default()
2013    ///     };
2014    ///     let response = client.insert_one(query).await;
2015    ///     let response = match response {
2016    ///         Ok(r) => r,
2017    ///         Err(e) => {
2018    ///             println!("Error: {:?}", e);
2019    ///             assert!(false, "insert_one failed with {:?}", e);
2020    ///             return Ok(());
2021    ///         }
2022    ///     };
2023    ///     let _obj: serde_json::Value = serde_json::from_str(&response.result).unwrap();
2024    ///     let _id = _obj["_id"].as_str().unwrap();
2025    ///     let item = format!("{{\"name\":\"updated from rust\", \"_id\": \"{}\"}}", _id);
2026    ///     let query = UpdateOneRequest {
2027    ///         collectionname: "entities".to_string(),
2028    ///         item: item.to_string(),
2029    ///         ..Default::default()
2030    ///     };
2031    ///     let response = client.update_one(query).await;
2032    ///     _ = match response {
2033    ///         Ok(r) => r,
2034    ///         Err(e) => {
2035    ///             println!("Error: {:?}", e);
2036    ///             assert!(false, "update_one failed with {:?}", e);
2037    ///             return Ok(());
2038    ///         }
2039    ///     };
2040    ///     let query = GetDocumentVersionRequest {
2041    ///         collectionname: "entities".to_string(),
2042    ///         id: _id.to_string(),
2043    ///         version: 0,
2044    ///         ..Default::default()
2045    ///     };
2046    ///     let response = client.get_document_version(query).await;
2047    ///     let response = match response {
2048    ///         Ok(r) => r,
2049    ///         Err(e) => {
2050    ///             println!("Error: {:?}", e);
2051    ///             assert!(false, "get_document_version failed with {:?}", e);
2052    ///             return Ok(());
2053    ///         }
2054    ///     };
2055    ///     let _obj = serde_json::from_str(&response);
2056    ///     let _obj: serde_json::Value = match _obj {
2057    ///         Ok(r) => r,
2058    ///         Err(e) => {
2059    ///             println!("Error: {:?}", e);
2060    ///             assert!(
2061    ///                 false,
2062    ///                 "parse get_document_version result failed with {:?}",
2063    ///                 e
2064    ///             );
2065    ///             return Ok(());
2066    ///         }
2067    ///     };
2068    ///     let name = _obj["name"].as_str().unwrap();
2069    ///     let version = _obj["_version"].as_i64().unwrap();
2070    ///     println!("version 0 Name: {}, Version: {}", name, version);
2071    ///     assert_eq!(name, "test from rust");
2072    ///     let query = GetDocumentVersionRequest {
2073    ///         collectionname: "entities".to_string(),
2074    ///         id: _id.to_string(),
2075    ///         version: 1,
2076    ///         ..Default::default()
2077    ///     };
2078    ///     let response = client.get_document_version(query).await;
2079    ///     assert!(
2080    ///         response.is_ok(),
2081    ///         "test_get_document_version failed with {:?}",
2082    ///         response.err().unwrap()
2083    ///     );
2084    ///     let _obj: serde_json::Value = serde_json::from_str(&response.unwrap()).unwrap();
2085    ///     let name = _obj["name"].as_str().unwrap();
2086    ///     let version = _obj["_version"].as_i64().unwrap();
2087    ///     println!("version 1 Name: {}, Version: {}", name, version);
2088    ///     assert_eq!(name, "updated from rust");
2089    ///     let query = GetDocumentVersionRequest {
2090    ///         collectionname: "entities".to_string(),
2091    ///         id: _id.to_string(),
2092    ///         version: -1,
2093    ///         ..Default::default()
2094    ///     };
2095    ///     let response = client.get_document_version(query).await;
2096    ///     assert!(
2097    ///         response.is_ok(),
2098    ///         "test_get_document_version failed with {:?}",
2099    ///         response.err().unwrap()
2100    ///     );
2101    ///     let _obj: serde_json::Value = serde_json::from_str(&response.unwrap()).unwrap();
2102    ///     let name = _obj["name"].as_str().unwrap();
2103    ///     let version = _obj["_version"].as_i64().unwrap();
2104    ///     println!("version -1 Name: {}, Version: {}", name, version);
2105    ///     assert_eq!(name, "updated from rust");
2106    ///     Ok(())
2107    /// }
2108    /// ```
2109    #[tracing::instrument(skip_all)]
2110    pub async fn get_document_version(
2111        &self,
2112        mut config: GetDocumentVersionRequest,
2113    ) -> Result<String, OpenIAPError> {
2114        if config.collectionname.is_empty() {
2115            config.collectionname = "entities".to_string();
2116        }
2117        if config.id.is_empty() {
2118            return Err(OpenIAPError::ClientError("No id provided".to_string()));
2119        }
2120        let envelope = config.to_envelope();
2121        let result = self.send(envelope, None).await;
2122        match result {
2123            Ok(m) => {
2124                let data = match m.data {
2125                    Some(data) => data,
2126                    None => {
2127                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2128                    }
2129                };
2130                if m.command == "error" {
2131                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2132                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2133                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2134                }
2135                let response: GetDocumentVersionResponse =
2136                    prost::Message::decode(data.value.as_ref())
2137                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2138                Ok(response.result)
2139            }
2140            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2141        }
2142    }
2143    /// Run an aggregate pipeline towards the database
2144    /// Example of running an aggregate pipeline on the entities collection, counting the number of documents with _type=test, and grouping them by name:
2145    /// ```
2146    /// use openiap_client::{OpenIAPError, Client, AggregateRequest};
2147    /// #[tokio::main]
2148    /// async fn main() -> Result<(), OpenIAPError> {
2149    ///    let client = Client::new_connect("").await?;
2150    ///     let config = AggregateRequest {
2151    ///         collectionname: "entities".to_string(),
2152    ///         aggregates: "[{\"$match\": {\"_type\": \"test\"}}, {\"$group\": {\"_id\": \"$name\", \"count\": {\"$sum\": 1}}}]".to_string(),
2153    ///         ..Default::default()
2154    ///     };
2155    ///     let response = client.aggregate(config).await?;
2156    ///     println!("Response: {:?}", response);
2157    ///     Ok(())
2158    /// }
2159    /// ```
2160    /// 
2161    #[tracing::instrument(skip_all)]
2162    pub async fn aggregate(
2163        &self,
2164        mut config: AggregateRequest,
2165    ) -> Result<AggregateResponse, OpenIAPError> {
2166        if config.collectionname.is_empty() {
2167            config.collectionname = "entities".to_string();
2168        }
2169        if config.hint.is_empty() {
2170            config.hint = "".to_string();
2171        }
2172        if config.queryas.is_empty() {
2173            config.queryas = "".to_string();
2174        }
2175        if config.aggregates.is_empty() {
2176            return Err(OpenIAPError::ClientError(
2177                "No aggregates provided".to_string(),
2178            ));
2179        }
2180        let envelope = config.to_envelope();
2181        let result = self.send(envelope, None).await;
2182        match result {
2183            Ok(m) => {
2184                let data = match m.data {
2185                    Some(data) => data,
2186                    None => {
2187                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2188                    }
2189                };
2190                if m.command == "error" {
2191                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2192                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2193                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2194                }
2195                let response: AggregateResponse = prost::Message::decode(data.value.as_ref())
2196                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2197                Ok(response)
2198            }
2199            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2200        }
2201    }
2202    /// Count the number of documents in a collection, with an optional query
2203    #[tracing::instrument(skip_all)]
2204    pub async fn count(&self, mut config: CountRequest) -> Result<CountResponse, OpenIAPError> {
2205        if config.collectionname.is_empty() {
2206            config.collectionname = "entities".to_string();
2207        }
2208        if config.query.is_empty() {
2209            config.query = "{}".to_string();
2210        }
2211        let envelope = config.to_envelope();
2212        let result = self.send(envelope, None).await;
2213        match result {
2214            Ok(m) => {
2215                let data = match m.data {
2216                    Some(data) => data,
2217                    None => {
2218                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2219                    }
2220                };
2221                if m.command == "error" {
2222                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2223                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2224                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2225                }
2226                let response: CountResponse = prost::Message::decode(data.value.as_ref())
2227                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2228                Ok(response)
2229            }
2230            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2231        }
2232    }
2233    /// Get distinct values for a field in a collection, with an optional query
2234    #[tracing::instrument(skip_all)]
2235    pub async fn distinct(
2236        &self,
2237        mut config: DistinctRequest,
2238    ) -> Result<DistinctResponse, OpenIAPError> {
2239        if config.collectionname.is_empty() {
2240            config.collectionname = "entities".to_string();
2241        }
2242        if config.query.is_empty() {
2243            config.query = "{}".to_string();
2244        }
2245        if config.field.is_empty() {
2246            return Err(OpenIAPError::ClientError("No field provided".to_string()));
2247        }
2248        let envelope = config.to_envelope();
2249        let result = self.send(envelope, None).await;
2250        match result {
2251            Ok(m) => {
2252                let data = match m.data {
2253                    Some(data) => data,
2254                    None => {
2255                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2256                    }
2257                };
2258                if m.command == "error" {
2259                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2260                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2261                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2262                }
2263                let response: DistinctResponse = prost::Message::decode(data.value.as_ref())
2264                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2265                Ok(response)
2266            }
2267            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2268        }
2269    }
2270    /// Insert a document into a collection
2271    #[tracing::instrument(skip_all)]
2272    pub async fn insert_one(
2273        &self,
2274        config: InsertOneRequest,
2275    ) -> Result<InsertOneResponse, OpenIAPError> {
2276        let envelope = config.to_envelope();
2277        let result = self.send(envelope, None).await;
2278        match result {
2279            Ok(m) => {
2280                let data = match m.data {
2281                    Some(data) => data,
2282                    None => {
2283                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2284                    }
2285                };
2286                if m.command == "error" {
2287                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2288                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2289                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2290                }
2291                let response: InsertOneResponse = prost::Message::decode(data.value.as_ref())
2292                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2293                Ok(response)
2294            }
2295            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2296        }
2297    }
2298    /// Insert many documents into a collection
2299    #[tracing::instrument(skip_all)]
2300    pub async fn insert_many(
2301        &self,
2302        config: InsertManyRequest,
2303    ) -> Result<InsertManyResponse, OpenIAPError> {
2304        let envelope = config.to_envelope();
2305        let result = self.send(envelope, None).await;
2306        match result {
2307            Ok(m) => {
2308                let data = match m.data {
2309                    Some(data) => data,
2310                    None => {
2311                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2312                    }
2313                };
2314                if m.command == "error" {
2315                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2316                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2317                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2318                }
2319                let response: InsertManyResponse = prost::Message::decode(data.value.as_ref())
2320                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2321                Ok(response)
2322            }
2323            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2324        }
2325    }
2326    /// Update ( replace ) a document in a collection
2327    #[tracing::instrument(skip_all)]
2328    pub async fn update_one(
2329        &self,
2330        config: UpdateOneRequest,
2331    ) -> Result<UpdateOneResponse, OpenIAPError> {
2332        let envelope = config.to_envelope();
2333        let result = self.send(envelope, None).await;
2334        match result {
2335            Ok(m) => {
2336                let data = match m.data {
2337                    Some(data) => data,
2338                    None => {
2339                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2340                    }
2341                };
2342                if m.command == "error" {
2343                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2344                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2345                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2346                }
2347                let response: UpdateOneResponse = prost::Message::decode(data.value.as_ref())
2348                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2349                Ok(response)
2350            }
2351            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2352        }
2353    }
2354    /// Using a unique key, insert a document or update it if it already exists ( upsert on steroids )
2355    #[tracing::instrument(skip_all)]
2356    pub async fn insert_or_update_one(
2357        &self,
2358        config: InsertOrUpdateOneRequest,
2359    ) -> Result<String, OpenIAPError> {
2360        let envelope = config.to_envelope();
2361        let result = self.send(envelope, None).await;
2362        match result {
2363            Ok(m) => {
2364                let data = match m.data {
2365                    Some(data) => data,
2366                    None => {
2367                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2368                    }
2369                };
2370                if m.command == "error" {
2371                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2372                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2373                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2374                }
2375                let response: InsertOrUpdateOneResponse =
2376                    prost::Message::decode(data.value.as_ref())
2377                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2378                Ok(response.result)
2379            }
2380            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2381        }
2382    }
2383    /// Using a unique key, insert many documents or update them if they already exist ( upsert on steroids )
2384    #[tracing::instrument(skip_all)]
2385    pub async fn insert_or_update_many(
2386        &self,
2387        config: InsertOrUpdateManyRequest,
2388    ) -> Result<InsertOrUpdateManyResponse, OpenIAPError> {
2389        let envelope = config.to_envelope();
2390        let result = self.send(envelope, None).await;
2391        match result {
2392            Ok(m) => {
2393                let data = match m.data {
2394                    Some(data) => data,
2395                    None => {
2396                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2397                    }
2398                };
2399                if m.command == "error" {
2400                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2401                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2402                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2403                }
2404                let response: InsertOrUpdateManyResponse =
2405                    prost::Message::decode(data.value.as_ref())
2406                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2407                Ok(response)
2408            }
2409            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2410        }
2411    }
2412    /// Update one or more documents in a collection using a update document
2413    #[tracing::instrument(skip_all)]
2414    pub async fn update_document(
2415        &self,
2416        config: UpdateDocumentRequest,
2417    ) -> Result<UpdateDocumentResponse, OpenIAPError> {
2418        let envelope = config.to_envelope();
2419        let result = self.send(envelope, None).await;
2420        match result {
2421            Ok(m) => {
2422                let data = match m.data {
2423                    Some(data) => data,
2424                    None => {
2425                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2426                    }
2427                };
2428                if m.command == "error" {
2429                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2430                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2431                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2432                }
2433                let response: UpdateDocumentResponse = prost::Message::decode(data.value.as_ref())
2434                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2435                Ok(response)
2436            }
2437            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2438        }
2439    }
2440    /// Delete a document from a collection using a unique key
2441    #[tracing::instrument(skip_all)]
2442    pub async fn delete_one(&self, config: DeleteOneRequest) -> Result<i32, OpenIAPError> {
2443        let envelope = config.to_envelope();
2444        let result = self.send(envelope, None).await;
2445        match result {
2446            Ok(m) => {
2447                let data = match m.data {
2448                    Some(data) => data,
2449                    None => {
2450                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2451                    }
2452                };
2453                if m.command == "error" {
2454                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2455                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2456                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2457                }
2458                let response: DeleteOneResponse = prost::Message::decode(data.value.as_ref())
2459                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2460                Ok(response.affectedrows)
2461            }
2462            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2463        }
2464    }
2465    /// Delete many documents from a collection using a query or list of unique keys
2466    #[tracing::instrument(skip_all)]
2467    pub async fn delete_many(&self, config: DeleteManyRequest) -> Result<i32, OpenIAPError> {
2468        let envelope = config.to_envelope();
2469        let result = self.send(envelope, None).await;
2470        match result {
2471            Ok(m) => {
2472                let data = match m.data {
2473                    Some(data) => data,
2474                    None => {
2475                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2476                    }
2477                };
2478                if m.command == "error" {
2479                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2480                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2481                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2482                }
2483                let response: DeleteManyResponse = prost::Message::decode(data.value.as_ref())
2484                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2485                Ok(response.affectedrows)
2486            }
2487            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2488        }
2489    }
2490    /// Download a file from the database
2491    #[tracing::instrument(skip_all)]
2492    pub async fn download(
2493        &self,
2494        config: DownloadRequest,
2495        folder: Option<&str>,
2496        filename: Option<&str>,
2497    ) -> Result<DownloadResponse, OpenIAPError> {
2498        let envelope = config.to_envelope();
2499        match self.sendwithstream(envelope).await {
2500            Ok((response_rx, mut stream_rx)) => {
2501                let temp_file_path = util::generate_unique_filename("openiap");
2502                debug!("Temp file: {:?}", temp_file_path);
2503                let mut temp_file = File::create(&temp_file_path).map_err(|e| {
2504                    OpenIAPError::ClientError(format!("Failed to create temp file: {}", e))
2505                })?;
2506                while !stream_rx.is_closed() {
2507                    match stream_rx.recv().await {
2508                        Some(received) => {
2509                            if received.is_empty() {
2510                                debug!("Stream closed");
2511                                break;
2512                            }
2513                            debug!("Received {} bytes", received.len());
2514                            temp_file.write_all(&received).map_err(|e| {
2515                                OpenIAPError::ClientError(format!(
2516                                    "Failed to write to temp file: {}",
2517                                    e
2518                                ))
2519                            })?;
2520                        }
2521                        None => {
2522                            debug!("Stream closed");
2523                            break;
2524                        }
2525                    }
2526                }
2527                temp_file.sync_all().map_err(|e| {
2528                    OpenIAPError::ClientError(format!("Failed to sync temp file: {}", e))
2529                })?;
2530
2531                let response = response_rx.await.map_err(|_| {
2532                    OpenIAPError::ClientError("Failed to receive response".to_string())
2533                })?;
2534
2535                if response.command == "error" {
2536                    let data = match response.data {
2537                        Some(data) => data,
2538                        None => {
2539                            return Err(OpenIAPError::ClientError(
2540                                "No data returned for SERVER error".to_string(),
2541                            ));
2542                        }
2543                    };
2544                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref()).unwrap();
2545                    return Err(OpenIAPError::ServerError(e.message));
2546                }
2547                let mut downloadresponse: DownloadResponse =
2548                    prost::Message::decode(response.data.unwrap().value.as_ref()).unwrap();
2549
2550                let mut final_filename = match &filename {
2551                    Some(f) => f,
2552                    None => downloadresponse.filename.as_str(),
2553                };
2554                if final_filename.is_empty() {
2555                    final_filename = downloadresponse.filename.as_str();
2556                }
2557                let mut folder = match &folder {
2558                    Some(f) => f,
2559                    None => ".",
2560                };
2561                if folder.is_empty() {
2562                    folder = ".";
2563                }
2564                let filepath = format!("{}/{}", folder, final_filename);
2565                trace!("Moving file to {}", filepath);
2566                util::move_file(temp_file_path.to_str().unwrap(), filepath.as_str()).map_err(|e| {
2567                    OpenIAPError::ClientError(format!("Failed to move file: {}", e))
2568                })?;
2569                debug!("Downloaded file to {}", filepath);
2570                downloadresponse.filename = filepath;
2571
2572                Ok(downloadresponse)
2573            }
2574            Err(status) => Err(OpenIAPError::ClientError(status.to_string())),
2575        }
2576    }
2577    /// Upload a file to the database
2578    #[tracing::instrument(skip_all)]
2579    pub async fn upload(
2580        &self,
2581        config: UploadRequest,
2582        filepath: &str,
2583    ) -> Result<UploadResponse, OpenIAPError> {
2584        // debug!("upload: Uploading file: {}", filepath);
2585        // let mut file = File::open(filepath)
2586        //     .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2587        // let chunk_size = 1024 * 1024;
2588        // let mut buffer = vec![0; chunk_size];
2589
2590        // let envelope = config.to_envelope();
2591        // let (response_rx, rid) = self.send_noawait(envelope).await?;
2592        // {
2593        //     let envelope = BeginStream::from_rid(rid.clone());
2594        //     debug!("Sending beginstream to #{}", rid);
2595        //     self.send_envelope(envelope).await.map_err(|e| OpenIAPError::ClientError(format!("Failed to send data: {}", e)))?;
2596        //     let mut counter = 0;
2597
2598        //     loop {
2599        //         let bytes_read = file.read(&mut buffer).map_err(|e| {
2600        //             OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2601        //         })?;
2602        //         counter += 1;
2603
2604        //         if bytes_read == 0 {
2605        //             break;
2606        //         }
2607
2608        //         let chunk = buffer[..bytes_read].to_vec();
2609        //         let envelope = Stream::from_rid(chunk, rid.clone());
2610        //         debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2611        //         self.send_envelope(envelope).await.map_err(|e| {
2612        //             OpenIAPError::ClientError(format!("Failed to send data: {}", e))
2613        //         })?
2614        //     }
2615
2616        //     let envelope = EndStream::from_rid(rid.clone());
2617        //     debug!("Sending endstream to #{}", rid);
2618        //     self.send_envelope(envelope).await
2619        //         .map_err(|e| OpenIAPError::ClientError(format!("Failed to send data: {}", e)))?;
2620        // }
2621
2622        // debug!("Wait for upload response for #{}", rid);
2623        // match response_rx.await {
2624        //     Ok(response) => {
2625        //         if response.command == "error" {
2626        //             let error_response: ErrorResponse = prost::Message::decode(
2627        //                 response.data.unwrap().value.as_ref(),
2628        //             )
2629        //             .map_err(|e| {
2630        //                 OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2631        //             })?;
2632        //             return Err(OpenIAPError::ServerError(error_response.message));
2633        //         }
2634        //         let upload_response: UploadResponse =
2635        //             prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2636        //                 OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2637        //             })?;
2638        //         Ok(upload_response)
2639        //     }
2640        //     Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2641        // }
2642        debug!("upload: Uploading file: {}", filepath);
2643        let mut file = File::open(filepath)
2644            .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2645        let chunk_size = 1024 * 1024;
2646        let mut buffer = vec![0; chunk_size];
2647    
2648        // Send the initial upload request
2649        let envelope = config.to_envelope();
2650        let (response_rx, rid) = self.send_noawait(envelope).await?;
2651        
2652        // Send the BeginStream message
2653        let envelope = BeginStream::from_rid(rid.clone());
2654        debug!("Sending beginstream to #{}", rid);
2655        if let Err(e) = self.send_envelope(envelope).await {
2656            let inner = self.inner.lock().await;
2657            inner.queries.lock().await.remove(&rid);
2658            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2659        }
2660    
2661        // Send file chunks
2662        let mut counter = 0;
2663        loop {
2664            let bytes_read = file.read(&mut buffer).map_err(|e| {
2665                OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2666            })?;
2667            counter += 1;
2668    
2669            if bytes_read == 0 {
2670                break;
2671            }
2672    
2673            let chunk = buffer[..bytes_read].to_vec();
2674            let envelope = Stream::from_rid(chunk, rid.clone());
2675            debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2676            if let Err(e) = self.send_envelope(envelope).await {
2677                let inner = self.inner.lock().await;
2678                inner.queries.lock().await.remove(&rid);
2679                return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2680            }
2681        }
2682    
2683        // Send the EndStream message
2684        let envelope = EndStream::from_rid(rid.clone());
2685        debug!("Sending endstream to #{}", rid);
2686        if let Err(e) = self.send_envelope(envelope).await {
2687            let inner = self.inner.lock().await;
2688            inner.queries.lock().await.remove(&rid);
2689            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2690        }
2691    
2692        // Await the response and clean up `inner.queries` afterward
2693        debug!("Wait for upload response for #{}", rid);
2694        let result = response_rx.await;
2695        let inner = self.inner.lock().await;
2696        inner.queries.lock().await.remove(&rid);
2697    
2698        match result {
2699            Ok(response) => {
2700                if response.command == "error" {
2701                    let error_response: ErrorResponse = prost::Message::decode(
2702                        response.data.unwrap().value.as_ref(),
2703                    )
2704                    .map_err(|e| {
2705                        OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2706                    })?;
2707                    return Err(OpenIAPError::ServerError(error_response.message));
2708                }
2709                let upload_response: UploadResponse =
2710                    prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2711                        OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2712                    })?;
2713                Ok(upload_response)
2714            }
2715            Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2716        }
2717    }
2718    /// Watch for changes in a collection ( change stream )
2719    #[tracing::instrument(skip_all)]
2720    pub async fn watch(
2721        &self,
2722        mut config: WatchRequest,
2723        callback: Box<dyn Fn(WatchEvent) + Send + Sync>,
2724    ) -> Result<String, OpenIAPError> {
2725        if config.collectionname.is_empty() {
2726            config.collectionname = "entities".to_string();
2727        }
2728        if config.paths.is_empty() {
2729            config.paths = vec!["".to_string()];
2730        }
2731
2732        let envelope = config.to_envelope();
2733        let result = self.send(envelope, None).await;
2734        match result {
2735            Ok(m) => {
2736                let data = match m.data {
2737                    Some(data) => data,
2738                    None => {
2739                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2740                    }
2741                };
2742                if m.command == "error" {
2743                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2744                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2745                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2746                }
2747                let response: WatchResponse = prost::Message::decode(data.value.as_ref())
2748                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2749
2750                let inner = self.inner.lock().await;
2751                inner
2752                    .watches
2753                    .lock()
2754                    .await
2755                    .insert(response.id.clone(), callback);
2756
2757                Ok(response.id)
2758            }
2759            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2760        }
2761    }
2762    /// Cancel a watch ( change stream )
2763    #[tracing::instrument(skip_all)]
2764    pub async fn unwatch(&self, id: &str) -> Result<(), OpenIAPError> {
2765        let config = UnWatchRequest::byid(id);
2766        let envelope = config.to_envelope();
2767        let result = self.send(envelope, None).await;
2768        match result {
2769            Ok(m) => {
2770                let data = match m.data {
2771                    Some(data) => data,
2772                    None => {
2773                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2774                    }
2775                };
2776                if m.command == "error" {
2777                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2778                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2779                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2780                }
2781                Ok(())
2782            }
2783            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2784        }
2785    }
2786    /// Register a queue for messaging ( amqp ) in the OpenIAP service
2787    #[tracing::instrument(skip_all)]
2788    pub async fn register_queue(
2789        &self,
2790        mut config: RegisterQueueRequest,
2791        callback: QueueCallbackFn,
2792    ) -> Result<String, OpenIAPError> {
2793        if config.queuename.is_empty() {
2794            config.queuename = "".to_string();
2795        }
2796
2797        let envelope = config.to_envelope();
2798        let result = self.send(envelope, None).await;
2799        match result {
2800            Ok(m) => {
2801                let data = match m.data {
2802                    Some(data) => data,
2803                    None => {
2804                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2805                    }
2806                };
2807                if m.command == "error" {
2808                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2809                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2810                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2811                }
2812                let response: RegisterQueueResponse =
2813                    prost::Message::decode(data.value.as_ref())
2814                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2815
2816                let inner = self.inner.lock().await;
2817                inner
2818                    .queues
2819                    .lock()
2820                    .await
2821                    .insert(response.queuename.clone(), callback);
2822
2823                Ok(response.queuename)
2824            }
2825            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2826        }
2827    }
2828    /// Unregister a queue or exchange for messaging ( amqp ) in the OpenIAP service
2829    #[tracing::instrument(skip_all)]
2830    pub async fn unregister_queue(&self, queuename: &str) -> Result<(), OpenIAPError> {
2831        let config = UnRegisterQueueRequest::byqueuename(queuename);
2832        let envelope = config.to_envelope();
2833        let result = self.send(envelope, None).await;
2834        match result {
2835            Ok(m) => {
2836                let data = match m.data {
2837                    Some(data) => data,
2838                    None => {
2839                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2840                    }
2841                };
2842                if m.command == "error" {
2843                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2844                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2845                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2846                }
2847                Ok(())
2848            }
2849            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2850        }
2851    }
2852    /// Register a exchange for messaging ( amqp ) in the OpenIAP service
2853    #[tracing::instrument(skip_all)]
2854    pub async fn register_exchange(
2855        &self,
2856        mut config: RegisterExchangeRequest,
2857        callback: QueueCallbackFn,
2858    ) -> Result<String, OpenIAPError> {
2859        if config.exchangename.is_empty() {
2860            return Err(OpenIAPError::ClientError(
2861                "No exchange name provided".to_string(),
2862            ));
2863        }
2864        if config.algorithm.is_empty() {
2865            config.algorithm = "fanout".to_string();
2866        }
2867        let envelope = config.to_envelope();
2868        let result = self.send(envelope, None).await;
2869        match result {
2870            Ok(m) => {
2871                let data = match m.data {
2872                    Some(data) => data,
2873                    None => {
2874                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2875                    }
2876                };
2877                if m.command == "error" {
2878                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2879                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2880                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2881                }
2882                let response: RegisterExchangeResponse =
2883                    prost::Message::decode(data.value.as_ref())
2884                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2885                if !response.queuename.is_empty() {
2886                    let inner = self.inner.lock().await;
2887                    inner
2888                        .queues
2889                        .lock()
2890                        .await
2891                        .insert(response.queuename.clone(), callback);
2892                }
2893                Ok(response.queuename)
2894            }
2895            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2896        }
2897    }
2898    /// Send a message to a queue or exchange in the OpenIAP service
2899    #[tracing::instrument(skip_all)]
2900    pub async fn queue_message(
2901        &self,
2902        config: QueueMessageRequest,
2903    ) -> Result<QueueMessageResponse, OpenIAPError> {
2904        if config.queuename.is_empty() && config.exchangename.is_empty() {
2905            return Err(OpenIAPError::ClientError(
2906                "No queue or exchange name provided".to_string(),
2907            ));
2908        }
2909        let envelope = config.to_envelope();
2910        let result = self.send(envelope, None).await;
2911        match result {
2912            Ok(m) => {
2913                let data = match m.data {
2914                    Some(d) => d,
2915                    None => {
2916                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
2917                    }
2918                };
2919                if m.command == "error" {
2920                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2921                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2922                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2923                }
2924                let response: QueueMessageResponse = prost::Message::decode(data.value.as_ref())
2925                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2926                Ok(response)
2927            }
2928            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2929        }
2930    }
2931    /// Send message to a queue or exchange in the OpenIAP service, and wait for a reply
2932    #[tracing::instrument(skip_all)]
2933    pub async fn rpc(&self, mut config: QueueMessageRequest, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
2934        if config.queuename.is_empty() && config.exchangename.is_empty() {
2935            return Err(OpenIAPError::ClientError(
2936                "No queue or exchange name provided".to_string(),
2937            ));
2938        }
2939
2940        let (tx, rx) = oneshot::channel::<String>();
2941        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
2942
2943        // Prepare the callback
2944        let callback: QueueCallbackFn = Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
2945            if let Some(tx) = tx.lock().unwrap().take() {
2946                let _ = tx.send(event.data);
2947            } else {
2948                debug!("Queue already closed");
2949            }
2950            Box::pin(async { None })
2951        });
2952
2953        // Check if we already have a reply queue and callback registered
2954        let mut reply_queue_guard = self.rpc_reply_queue.lock().await;
2955        let mut callback_guard = self.rpc_callback.lock().await;
2956
2957        // If not registered, or if connection state is not Connected/Signedin, re-register
2958        let state = self.get_state();
2959        if reply_queue_guard.is_none() || !(state == ClientState::Connected || state == ClientState::Signedin) {
2960            // Register a new reply queue
2961            let q = self
2962                .register_queue(
2963                    RegisterQueueRequest {
2964                        queuename: "".to_string(),
2965                    },
2966                    callback.clone(),
2967                )
2968                .await?;
2969            *reply_queue_guard = Some(q.clone());
2970            *callback_guard = Some(callback.clone());
2971        } else {
2972            // Already registered, but need to update the callback for this call
2973            if let Some(qname) = reply_queue_guard.as_ref() {
2974                let inner = self.inner.lock().await;
2975                inner.queues.lock().await.insert(qname.clone(), callback.clone());
2976            }
2977        }
2978
2979        let q = reply_queue_guard.as_ref().unwrap().clone();
2980        config.replyto = q.clone();
2981        let envelope = config.to_envelope();
2982
2983        let result = self.send(envelope, None).await;
2984        let rpc_result = match result {
2985            Ok(m) => {
2986                let data = match m.data {
2987                    Some(d) => d,
2988                    None => {
2989                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
2990                    }
2991                };
2992                if m.command == "error" {
2993                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2994                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2995                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2996                }
2997
2998                match tokio::time::timeout(timeout, rx).await {
2999                    Ok(Ok(val)) => Ok(val),
3000                    Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3001                    Err(_) => {
3002                        // Timeout: clear the cached queue so it will be re-registered next time
3003                        *reply_queue_guard = None;
3004                        *callback_guard = None;
3005                        Err(OpenIAPError::ClientError("RPC request timed out".to_string()))
3006                    },
3007                }
3008            }
3009            Err(e) => {
3010                // If we get an error, clear the cached queue so it will be re-registered next time
3011                *reply_queue_guard = None;
3012                *callback_guard = None;
3013                Err(OpenIAPError::ClientError(e.to_string()))
3014            }
3015        };
3016
3017        rpc_result
3018    }
3019    // pub async fn rpc2(&self, mut config: QueueMessageRequest, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
3020    //     if config.queuename.is_empty() && config.exchangename.is_empty() {
3021    //         return Err(OpenIAPError::ClientError(
3022    //             "No queue or exchange name provided".to_string(),
3023    //         ));
3024    //     }
3025
3026    //     let (tx, rx) = oneshot::channel::<String>();
3027    //     let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3028
3029    //     let q = self
3030    //         .register_queue(
3031    //             RegisterQueueRequest {
3032    //                 queuename: "".to_string(),
3033    //             },
3034    //             Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3035    //                 if let Some(tx) = tx.lock().unwrap().take() {
3036    //                     let _ = tx.send(event.data);
3037    //                 } else {
3038    //                     debug!("Queue already closed");
3039    //                 }
3040    //                 Box::pin(async { None })
3041    //             }),
3042    //         )
3043    //         .await
3044    //         .unwrap();
3045
3046    //     config.replyto = q.clone();
3047    //     let envelope = config.to_envelope();
3048
3049    //     let result = self.send(envelope, None).await;
3050    //     let rpc_result = match result {
3051    //         Ok(m) => {
3052    //             let data = match m.data {
3053    //                 Some(d) => d,
3054    //                 None => {
3055    //                     return Err(OpenIAPError::ClientError("No data in response".to_string()))
3056    //                 }
3057    //             };
3058    //             if m.command == "error" {
3059    //                 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3060    //                     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3061    //                 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3062    //             }
3063
3064    //             match tokio::time::timeout(timeout, rx).await {
3065    //                 Ok(Ok(val)) => Ok(val),
3066    //                 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3067    //                 Err(_) => Err(OpenIAPError::ClientError("RPC request timed out".to_string())),
3068    //             }
3069    //         }
3070    //         Err(e) => {
3071    //             let unregister_err = self.unregister_queue(&q).await.err();
3072    //             if unregister_err.is_some() {
3073    //                 error!("Failed to unregister Response Queue: {:?}", unregister_err);
3074    //             }
3075    //             Err(OpenIAPError::ClientError(e.to_string()))
3076    //         }
3077    //     };
3078
3079    //     if let Err(e) = self.unregister_queue(&q).await {
3080    //         error!("Failed to unregister Response Queue: {:?}", e);
3081    //     } else {
3082    //         debug!("Unregistered Response Queue: {:?}", q);
3083    //     }
3084    //     match rpc_result {
3085    //         Ok(val) => Ok(val),
3086    //         Err(e) => Err(e),
3087    //     }
3088    // }
3089    /// Push a new workitem to a workitem queue
3090    /// If the file is less than 5 megabytes it will be attached to the workitem
3091    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3092    #[tracing::instrument(skip_all)]
3093    pub async fn push_workitem(
3094        &self,
3095        mut config: PushWorkitemRequest,
3096    ) -> Result<PushWorkitemResponse, OpenIAPError> {
3097        if config.wiq.is_empty() && config.wiqid.is_empty() {
3098            return Err(OpenIAPError::ClientError(
3099                "No queue name or id provided".to_string(),
3100            ));
3101        }
3102        for f in &mut config.files {
3103            if f.filename.is_empty() && f.file.is_empty() {
3104                debug!("Filename is empty");
3105            } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3106                // does file exist?
3107                if !std::path::Path::new(&f.filename).exists() {
3108                    debug!("File does not exist: {}", f.filename);
3109                } else {
3110                    let filesize = std::fs::metadata(&f.filename).unwrap().len();
3111                    // if filesize is less than 5 meggabytes attach it, else upload
3112                    if filesize < 5 * 1024 * 1024 {
3113                        debug!("File {} exists so ATTACHING it.", f.filename);
3114                        let filename = std::path::Path::new(&f.filename)
3115                            .file_name()
3116                            .unwrap()
3117                            .to_str()
3118                            .unwrap();
3119                        f.file = std::fs::read(&f.filename).unwrap();
3120                        // f.file = compress_file(&f.filename).unwrap();
3121                        // f.compressed = false;
3122                        f.file = util::compress_file_to_vec(&f.filename).unwrap();
3123                        f.compressed = true;
3124                        f.filename = filename.to_string();
3125                        f.id = "findme".to_string();
3126                        trace!(
3127                            "File {} was read and assigned to f.file, size: {}",
3128                            f.filename,
3129                            f.file.len()
3130                        );
3131                    } else {
3132                        debug!("File {} exists so UPLOADING it.", f.filename);
3133                        let filename = std::path::Path::new(&f.filename)
3134                            .file_name()
3135                            .unwrap()
3136                            .to_str()
3137                            .unwrap();
3138                        let uploadconfig = UploadRequest {
3139                            filename: filename.to_string(),
3140                            collectionname: "fs.files".to_string(),
3141                            ..Default::default()
3142                        };
3143                        let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3144                        trace!("File {} was upload as {}", filename, uploadresult.id);
3145                        // f.filename = "".to_string();
3146                        f.id = uploadresult.id.clone();
3147                        f.filename = filename.to_string();
3148                    }
3149                }
3150            } else {
3151                debug!("File {} is already uploaded", f.filename);
3152            }
3153        }
3154        let envelope = config.to_envelope();
3155        let result = self.send(envelope, None).await;
3156        match result {
3157            Ok(m) => {
3158                let data = match m.data {
3159                    Some(data) => data,
3160                    None => {
3161                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3162                    }
3163                };
3164                if m.command == "error" {
3165                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3166                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3167                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3168                }
3169                let response: PushWorkitemResponse = prost::Message::decode(data.value.as_ref())
3170                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3171                Ok(response)
3172            }
3173            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3174        }
3175    }
3176    /// Push multiple workitems to a workitem queue
3177    /// If the file is less than 5 megabytes it will be attached to the workitem
3178    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3179    #[tracing::instrument(skip_all)]
3180    pub async fn push_workitems(
3181        &self,
3182        mut config: PushWorkitemsRequest,
3183    ) -> Result<PushWorkitemsResponse, OpenIAPError> {
3184        if config.wiq.is_empty() && config.wiqid.is_empty() {
3185            return Err(OpenIAPError::ClientError(
3186                "No queue name or id provided".to_string(),
3187            ));
3188        }
3189        for wi in &mut config.items {
3190            for f in &mut wi.files {
3191                if f.filename.is_empty() && f.file.is_empty() {
3192                    debug!("Filename is empty");
3193                } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3194                    // does file exist?
3195                    if !std::path::Path::new(&f.filename).exists() {
3196                        debug!("File does not exist: {}", f.filename);
3197                    } else {
3198                        let filesize = std::fs::metadata(&f.filename).unwrap().len();
3199                        // if filesize is less than 5 meggabytes attach it, else upload
3200                        if filesize < 5 * 1024 * 1024 {
3201                            debug!("File {} exists so ATTACHING it.", f.filename);
3202                            let filename = std::path::Path::new(&f.filename)
3203                                .file_name()
3204                                .unwrap()
3205                                .to_str()
3206                                .unwrap();
3207                            f.file = std::fs::read(&f.filename).unwrap();
3208                            // f.file = compress_file(&f.filename).unwrap();
3209                            // f.compressed = false;
3210                            f.file = util::compress_file_to_vec(&f.filename).unwrap();
3211                            f.compressed = true;
3212                            f.filename = filename.to_string();
3213                            f.id = "findme".to_string();
3214                            trace!(
3215                                "File {} was read and assigned to f.file, size: {}",
3216                                f.filename,
3217                                f.file.len()
3218                            );
3219                        } else {
3220                            debug!("File {} exists so UPLOADING it.", f.filename);
3221                            let filename = std::path::Path::new(&f.filename)
3222                                .file_name()
3223                                .unwrap()
3224                                .to_str()
3225                                .unwrap();
3226                            let uploadconfig = UploadRequest {
3227                                filename: filename.to_string(),
3228                                collectionname: "fs.files".to_string(),
3229                                ..Default::default()
3230                            };
3231                            let uploadresult =
3232                                self.upload(uploadconfig, &f.filename).await.unwrap();
3233                            trace!("File {} was upload as {}", filename, uploadresult.id);
3234                            // f.filename = "".to_string();
3235                            f.id = uploadresult.id.clone();
3236                            f.filename = filename.to_string();
3237                        }
3238                    }
3239                } else {
3240                    debug!("File {} is already uploaded", f.filename);
3241                }
3242            }
3243        }
3244        let envelope = config.to_envelope();
3245        let result = self.send(envelope, None).await;
3246        match result {
3247            Ok(m) => {
3248                let data = match m.data {
3249                    Some(data) => data,
3250                    None => {
3251                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3252                    }
3253                };
3254                if m.command == "error" {
3255                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3256                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3257                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3258                }
3259                let response: PushWorkitemsResponse =
3260                    prost::Message::decode(data.value.as_ref())
3261                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3262                Ok(response)
3263            }
3264            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3265        }
3266    }
3267    /// Pop a workitem from a workitem queue, return None if no workitem is available
3268    /// Any files attached to the workitem will be downloaded to the downloadfolder ( default "." )
3269    #[tracing::instrument(skip_all)]
3270    pub async fn pop_workitem(
3271        &self,
3272        config: PopWorkitemRequest,
3273        downloadfolder: Option<&str>,
3274    ) -> Result<PopWorkitemResponse, OpenIAPError> {
3275        if config.wiq.is_empty() && config.wiqid.is_empty() {
3276            return Err(OpenIAPError::ClientError(
3277                "No queue name or id provided".to_string(),
3278            ));
3279        }
3280        let envelope = config.to_envelope();
3281        let result = self.send(envelope, None).await;
3282        match result {
3283            Ok(m) => {
3284                let data = match m.data {
3285                    Some(data) => data,
3286                    None => {
3287                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3288                    }
3289                };
3290                if m.command == "error" {
3291                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3292                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3293                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3294                }
3295                let response: PopWorkitemResponse = prost::Message::decode(data.value.as_ref())
3296                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3297
3298                match &response.workitem {
3299                    Some(wi) => {
3300                        for f in &wi.files {
3301                            if !f.id.is_empty() {
3302                                let downloadconfig = DownloadRequest {
3303                                    id: f.id.clone(),
3304                                    collectionname: "fs.files".to_string(),
3305                                    ..Default::default()
3306                                };
3307                                let downloadresult =
3308                                    match self.download(downloadconfig, downloadfolder, None).await
3309                                    {
3310                                        Ok(r) => r,
3311                                        Err(e) => {
3312                                            debug!("Failed to download file: {}", e);
3313                                            continue;
3314                                        }
3315                                    };
3316                                debug!(
3317                                    "File {} was downloaded as {}",
3318                                    f.filename, downloadresult.filename
3319                                );
3320                            }
3321                        }
3322                    }
3323                    None => {
3324                        debug!("No workitem found");
3325                    }
3326                }
3327                Ok(response)
3328            }
3329            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3330        }
3331    }
3332    /// Update a workitem in a workitem queue
3333    /// If the file is less than 5 megabytes it will be attached to the workitem
3334    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3335    /// If a fileid is provided it will be used to update the file
3336    /// if a filename is provided without the id, it will be deleted
3337    #[tracing::instrument(skip_all)]
3338    pub async fn update_workitem(
3339        &self,
3340        mut config: UpdateWorkitemRequest,
3341    ) -> Result<UpdateWorkitemResponse, OpenIAPError> {
3342        match &config.workitem {
3343            Some(wiq) => {
3344                if wiq.id.is_empty() {
3345                    return Err(OpenIAPError::ClientError(
3346                        "No workitem id provided".to_string(),
3347                    ));
3348                }
3349            }
3350            None => {
3351                return Err(OpenIAPError::ClientError(
3352                    "No workitem provided".to_string(),
3353                ));
3354            }
3355        }
3356        for f in &mut config.files {
3357            if f.filename.is_empty() && f.file.is_empty() {
3358                debug!("Filename is empty");
3359            } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3360                if !std::path::Path::new(&f.filename).exists() {
3361                    debug!("File does not exist: {}", f.filename);
3362                } else {
3363                    debug!("File {} exists so uploading it.", f.filename);
3364                    let filename = std::path::Path::new(&f.filename)
3365                        .file_name()
3366                        .unwrap()
3367                        .to_str()
3368                        .unwrap();
3369                    let uploadconfig = UploadRequest {
3370                        filename: filename.to_string(),
3371                        collectionname: "fs.files".to_string(),
3372                        ..Default::default()
3373                    };
3374                    let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3375                    trace!("File {} was upload as {}", filename, uploadresult.id);
3376                    f.id = uploadresult.id.clone();
3377                    f.filename = filename.to_string();
3378                }
3379            } else {
3380                debug!("Skipped file");
3381            }
3382        }
3383        let envelope = config.to_envelope();
3384        let result = self.send(envelope, None).await;
3385        match result {
3386            Ok(m) => {
3387                let data = match m.data {
3388                    Some(d) => d,
3389                    None => {
3390                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3391                    }
3392                };
3393                if m.command == "error" {
3394                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3395                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3396                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3397                }
3398                let response: UpdateWorkitemResponse = prost::Message::decode(data.value.as_ref())
3399                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3400                Ok(response)
3401            }
3402            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3403        }
3404    }
3405    /// Delete a workitem from a workitem queue
3406    #[tracing::instrument(skip_all)]
3407    pub async fn delete_workitem(
3408        &self,
3409        config: DeleteWorkitemRequest,
3410    ) -> Result<DeleteWorkitemResponse, OpenIAPError> {
3411        if config.id.is_empty() {
3412            return Err(OpenIAPError::ClientError(
3413                "No workitem id provided".to_string(),
3414            ));
3415        }
3416        let envelope = config.to_envelope();
3417        let result = self.send(envelope, None).await;
3418        match result {
3419            Ok(m) => {
3420                let data = match m.data {
3421                    Some(d) => d,
3422                    None => {
3423                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3424                    }
3425                };
3426                if m.command == "error" {
3427                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3428                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3429                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3430                }
3431                let response: DeleteWorkitemResponse = prost::Message::decode(data.value.as_ref())
3432                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3433                Ok(response)
3434            }
3435            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3436        }
3437    }
3438    /// Add a workitem queue to openiap instance
3439    #[tracing::instrument(skip_all)]
3440    pub async fn add_workitem_queue(
3441        &self,
3442        config: AddWorkItemQueueRequest,
3443    ) -> Result<WorkItemQueue, OpenIAPError> {
3444        if config.workitemqueue.is_none() {
3445            return Err(OpenIAPError::ClientError(
3446                "No workitem queue name provided".to_string(),
3447            ));
3448        }
3449        let envelope = config.to_envelope();
3450        let result = self.send(envelope, None).await;
3451        match result {
3452            Ok(m) => {
3453                let data = match m.data {
3454                    Some(d) => d,
3455                    None => {
3456                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3457                    }
3458                };
3459                if m.command == "error" {
3460                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3461                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3462                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3463                }
3464                let response: AddWorkItemQueueResponse =
3465                    prost::Message::decode(data.value.as_ref())
3466                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3467                match response.workitemqueue {
3468                    Some(wiq) => Ok(wiq),
3469                    None => {
3470                        return Err(OpenIAPError::ClientError(
3471                            "No workitem queue returned".to_string(),
3472                        ));
3473                    }
3474                }
3475            }
3476            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3477        }
3478    }
3479    /// Update a workitem queue in openiap instance
3480    #[tracing::instrument(skip_all)]
3481    pub async fn update_workitem_queue(
3482        &self,
3483        config: UpdateWorkItemQueueRequest,
3484    ) -> Result<WorkItemQueue, OpenIAPError> {
3485        if config.workitemqueue.is_none() {
3486            return Err(OpenIAPError::ClientError(
3487                "No workitem queue name provided".to_string(),
3488            ));
3489        }
3490        let envelope = config.to_envelope();
3491        let result = self.send(envelope, None).await;
3492        match result {
3493            Ok(m) => {
3494                let data = match m.data {
3495                    Some(d) => d,
3496                    None => {
3497                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3498                    }
3499                };
3500                if m.command == "error" {
3501                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3502                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3503                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3504                }
3505                let response: UpdateWorkItemQueueResponse =
3506                    prost::Message::decode(data.value.as_ref())
3507                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3508                match response.workitemqueue {
3509                    Some(wiq) => Ok(wiq),
3510                    None => {
3511                        return Err(OpenIAPError::ClientError(
3512                            "No workitem queue returned".to_string(),
3513                        ));
3514                    }
3515                }
3516            }
3517            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3518        }
3519    }
3520    /// Delete a workitem queue from openiap instance
3521    #[tracing::instrument(skip_all)]
3522    pub async fn delete_workitem_queue(
3523        &self,
3524        config: DeleteWorkItemQueueRequest,
3525    ) -> Result<(), OpenIAPError> {
3526        if config.wiq.is_empty() && config.wiqid.is_empty() {
3527            return Err(OpenIAPError::ClientError(
3528                "No workitem queue name or id provided".to_string(),
3529            ));
3530        }
3531        let envelope = config.to_envelope();
3532        let result = self.send(envelope, None).await;
3533        match result {
3534            Ok(m) => {
3535                let data = match m.data {
3536                    Some(d) => d,
3537                    None => {
3538                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3539                    }
3540                };
3541                if m.command == "error" {
3542                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3543                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3544                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3545                }
3546                Ok(())
3547            }
3548            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3549        }
3550    }
3551    /// Run custom command on server. Custom commands are commands who is "on trail", they may change and are not ready to be moved to the fixed protobuf format yet
3552    #[tracing::instrument(skip_all)]
3553    pub async fn custom_command(
3554        &self,
3555        config: CustomCommandRequest,
3556        timeout: Option<tokio::time::Duration>,
3557    ) -> Result<String, OpenIAPError> {
3558        if config.command.is_empty() {
3559            return Err(OpenIAPError::ClientError("No command provided".to_string()));
3560        }
3561        let envelope = config.to_envelope();
3562        let result = self.send(envelope, timeout).await;
3563        match result {
3564            Ok(m) => {
3565                let data = match m.data {
3566                    Some(d) => d,
3567                    None => {
3568                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3569                    }
3570                };
3571                if m.command == "error" {
3572                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3573                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3574                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3575                }
3576                let response: CustomCommandResponse =
3577                    prost::Message::decode(data.value.as_ref())
3578                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3579                Ok(response.result)
3580            }
3581            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3582        }
3583    }
3584    /// Delete a package from the database, cleaning up all all files and data
3585    #[tracing::instrument(skip_all)]
3586    pub async fn delete_package(&self, packageid: &str) -> Result<(), OpenIAPError> {
3587        let config = DeletePackageRequest::byid(packageid);
3588        let envelope = config.to_envelope();
3589        let result = self.send(envelope, None).await;
3590        match result {
3591            Ok(m) => {
3592                let data = match m.data {
3593                    Some(data) => data,
3594                    None => {
3595                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3596                    }
3597                };
3598                if m.command == "error" {
3599                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3600                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3601                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3602                }
3603                // prost::Message::decode(data.value.as_ref())
3604                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3605                Ok(())
3606            }
3607            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3608        }
3609    }
3610    /// Start Agent
3611    #[tracing::instrument(skip_all)]
3612    pub async fn start_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3613        let config = StartAgentRequest::byid(agentid);
3614        let envelope = config.to_envelope();
3615        let result = self.send(envelope, None).await;
3616        match result {
3617            Ok(m) => {
3618                let data = match m.data {
3619                    Some(d) => d,
3620                    None => {
3621                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3622                    }
3623                };
3624                if m.command == "error" {
3625                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3626                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3627                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3628                }
3629                // prost::Message::decode(data.value.as_ref())
3630                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3631                Ok(())
3632            }
3633            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3634        }
3635    }
3636    /// Stop an agent, this will cleanup all resources and stop the agent
3637    #[tracing::instrument(skip_all)]
3638    pub async fn stop_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3639        let config = StopAgentRequest::byid(agentid);
3640        let envelope = config.to_envelope();
3641        let result = self.send(envelope, None).await;
3642        match result {
3643            Ok(m) => {
3644                let data = match m.data {
3645                    Some(d) => d,
3646                    None => {
3647                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3648                    }
3649                };
3650                if m.command == "error" {
3651                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3652                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3653                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3654                }
3655                // prost::Message::decode(data.value.as_ref())
3656                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3657                Ok(())
3658            }
3659            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3660        }
3661    }
3662    /// Delete a pod from an agent, on kubernetes this will remove the pod and kubernetes will re-create it, on docker this will remove the pod. Then use start_agent to start the agent again
3663    #[tracing::instrument(skip_all)]
3664    pub async fn delete_agent_pod(&self, agentid: &str, podname: &str) -> Result<(), OpenIAPError> {
3665        let config = DeleteAgentPodRequest::byid(agentid, podname);
3666        let envelope = config.to_envelope();
3667        let result = self.send(envelope, None).await;
3668        match result {
3669            Ok(m) => {
3670                let data = match m.data {
3671                    Some(d) => d,
3672                    None => {
3673                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3674                    }
3675                };
3676                if m.command == "error" {
3677                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3678                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3679                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3680                }
3681                // prost::Message::decode(data.value.as_ref())
3682                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3683                Ok(())
3684            }
3685            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3686        }
3687    }
3688    /// Delete an agent, this will cleanup all resources and delete the agent
3689    #[tracing::instrument(skip_all)]
3690    pub async fn delete_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3691        let config = DeleteAgentRequest::byid(agentid);
3692        let envelope = config.to_envelope();
3693        let result = self.send(envelope, None).await;
3694        match result {
3695            Ok(m) => {
3696                let data = match m.data {
3697                    Some(d) => d,
3698                    None => {
3699                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3700                    }
3701                };
3702                if m.command == "error" {
3703                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3704                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3705                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3706                }
3707                // prost::Message::decode(data.value.as_ref())
3708                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3709                Ok(())
3710            }
3711            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3712        }
3713    }
3714    /// Get all pods associated with an agent, if stats is true, it will return memory and cpu usage for each pod
3715    #[tracing::instrument(skip_all)]
3716    pub async fn get_agent_pods(&self, agentid: &str, stats: bool) -> Result<String, OpenIAPError> {
3717        let config = GetAgentPodsRequest::byid(agentid, stats);
3718        let envelope = config.to_envelope();
3719        let result = self.send(envelope, None).await;
3720        match result {
3721            Ok(m) => {
3722                let data = match m.data {
3723                    Some(d) => d,
3724                    None => {
3725                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3726                    }
3727                };
3728                if m.command == "error" {
3729                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3730                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3731                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3732                }
3733                let response: GetAgentPodsResponse = prost::Message::decode(data.value.as_ref())
3734                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3735                Ok(response.results)
3736            }
3737            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3738        }
3739    }
3740    /// Get logs from a pod associated with an agent, leave podname empty to get logs from all pods
3741    #[tracing::instrument(skip_all)]
3742    pub async fn get_agent_pod_logs(
3743        &self,
3744        agentid: &str,
3745        podname: &str,
3746    ) -> Result<String, OpenIAPError> {
3747        let config = GetAgentLogRequest::new(agentid, podname);
3748        let envelope = config.to_envelope();
3749        let result = self.send(envelope, None).await;
3750        match result {
3751            Ok(m) => {
3752                let data = match m.data {
3753                    Some(d) => d,
3754                    None => {
3755                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3756                    }
3757                };
3758                if m.command == "error" {
3759                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3760                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3761                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3762                }
3763                let response: GetAgentLogResponse = prost::Message::decode(data.value.as_ref())
3764                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3765                Ok(response.result)
3766            }
3767            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3768        }
3769    }
3770
3771    /// Create/update a customer in the OpenIAP service. If stripe has been configured, it will create or update a customer in stripe as well
3772    /// A customer is a customer object that can only be updated using this function, and 2 roles ( customername admins and customername users )
3773    #[tracing::instrument(skip_all)]
3774    pub async fn ensure_customer(
3775        &self,
3776        config: EnsureCustomerRequest,
3777    ) -> Result<EnsureCustomerResponse, OpenIAPError> {
3778        if config.customer.is_none() && config.stripe.is_none() {
3779            return Err(OpenIAPError::ClientError(
3780                "No customer or stripe provided".to_string(),
3781            ));
3782        }
3783        let envelope = config.to_envelope();
3784        let result = self.send(envelope, None).await;
3785        match result {
3786            Ok(m) => {
3787                let data = match m.data {
3788                    Some(d) => d,
3789                    None => {
3790                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3791                    }
3792                };
3793                if m.command == "error" {
3794                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3795                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3796                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3797                }
3798                let response: EnsureCustomerResponse = prost::Message::decode(data.value.as_ref())
3799                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3800                Ok(response)
3801            }
3802            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3803        }
3804    }
3805    /// Create a new workflow instance, to be used to workflow in/out nodes in NodeRED
3806    #[tracing::instrument(skip_all)]
3807    pub async fn create_workflow_instance(
3808        &self,
3809        config: CreateWorkflowInstanceRequest,
3810    ) -> Result<String, OpenIAPError> {
3811        if config.workflowid.is_empty() {
3812            return Err(OpenIAPError::ClientError(
3813                "No workflow id provided".to_string(),
3814            ));
3815        }
3816        let envelope = config.to_envelope();
3817        let result = self.send(envelope, None).await;
3818        match result {
3819            Ok(m) => {
3820                let data = match m.data {
3821                    Some(d) => d,
3822                    None => {
3823                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3824                    }
3825                };
3826                if m.command == "error" {
3827                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3828                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3829                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3830                }
3831                let response: CreateWorkflowInstanceResponse =
3832                    prost::Message::decode(data.value.as_ref())
3833                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3834                Ok(response.instanceid)
3835            }
3836            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3837        }
3838    }
3839
3840    /// Invoke a workflow in the OpenRPA robot where robotid is the userid of the user the robot is running as, or a roleid with RPA enabled
3841    #[tracing::instrument(skip_all)]
3842    pub async fn invoke_openrpa(
3843        &self,
3844        config: InvokeOpenRpaRequest,
3845        timeout: Option<tokio::time::Duration>,
3846    ) -> Result<String, OpenIAPError> {
3847        if config.robotid.is_empty() {
3848            return Err(OpenIAPError::ClientError(
3849                "No robot id provided".to_string(),
3850            ));
3851        }
3852        if config.workflowid.is_empty() {
3853            return Err(OpenIAPError::ClientError(
3854                "No workflow id provided".to_string(),
3855            ));
3856        }
3857
3858        let (tx, rx) = oneshot::channel::<String>();
3859        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3860
3861        let q = self
3862            .register_queue(
3863                RegisterQueueRequest {
3864                    queuename: "".to_string(),
3865                },
3866                Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3867                    let tx = tx.clone();
3868                    Box::pin(async move {
3869                        let json = event.data.clone();
3870                        let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3871                        let command: String = obj["command"].as_str().unwrap().to_string();
3872                        debug!("Received event: {:?}", event);
3873                        if command.eq("invokesuccess") {
3874                            debug!("Robot successfully started running workflow");
3875                        } else if command.eq("invokeidle") {
3876                            debug!("Workflow went idle");
3877                        } else if command.eq("invokeerror") {
3878                            debug!("Robot failed to run workflow");
3879                            let tx = tx.lock().unwrap().take().unwrap();
3880                            tx.send(event.data).unwrap();
3881                        } else if command.eq("timeout") {
3882                            debug!("No robot picked up the workflow");
3883                            let tx = tx.lock().unwrap().take().unwrap();
3884                            tx.send(event.data).unwrap();
3885                        } else if command.eq("invokecompleted") {
3886                            debug!("Robot completed running workflow");
3887                            let tx = tx.lock().unwrap().take().unwrap();
3888                            tx.send(event.data).unwrap();
3889                        } else {
3890                            let tx = tx.lock().unwrap().take().unwrap();
3891                            tx.send(event.data).unwrap();
3892                        }
3893                        None
3894                    })
3895                }),
3896            )
3897            .await
3898            .unwrap();
3899        debug!("Registered Response Queue: {:?}", q);
3900        let data = format!(
3901            "{{\"command\":\"invoke\",\"workflowid\":\"{}\",\"payload\": {}}}",
3902            config.workflowid, config.payload
3903        );
3904        debug!("Send Data: {}", data);
3905        debug!("To Queue: {} With reply to: {}", config.robotid, q);
3906        let config = QueueMessageRequest {
3907            queuename: config.robotid.clone(),
3908            replyto: q.clone(),
3909            data,
3910            ..Default::default()
3911        };
3912
3913        let envelope = config.to_envelope();
3914
3915        let result = self.send(envelope, timeout).await;
3916        match result {
3917            Ok(m) => {
3918                let data = match m.data {
3919                    Some(d) => d,
3920                    None => {
3921                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
3922                    }
3923                };
3924                if m.command == "error" {
3925                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3926                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3927                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3928                }
3929                // prost::Message::decode(data.value.as_ref())
3930                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3931
3932                let duration = timeout.unwrap_or_else(|| self.get_default_timeout());
3933                // let json = rx.await.unwrap();
3934                let json = match tokio::time::timeout(duration, rx).await {
3935                    Ok(Ok(val)) => {
3936                        // Success, unregister queue before returning
3937                        let _ = self.unregister_queue(&q).await;
3938                        val
3939                    },
3940                    Ok(Err(e)) => {
3941                        let _ = self.unregister_queue(&q).await;
3942                        return Err(OpenIAPError::CustomError(e.to_string()));
3943                    },
3944                    Err(_) => {
3945                        let _ = self.unregister_queue(&q).await;
3946                        return Err(OpenIAPError::ServerError("Timeout".to_string()));
3947                    },
3948                };
3949                debug!("Received json result: {:?}", json);
3950                let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3951                let command: String = obj["command"].as_str().unwrap().to_string();
3952                let mut data = "".to_string();
3953                if obj["data"].as_str().is_some() {
3954                    data = obj["data"].as_str().unwrap().to_string();
3955                } else if obj["data"].as_object().is_some() {
3956                    data = obj["data"].to_string();
3957                }
3958                if !command.eq("invokecompleted") {
3959                    if command.eq("timeout") {
3960                        return Err(OpenIAPError::ServerError("Timeout".to_string()));
3961                    } else {
3962                        if data.is_empty() {
3963                            return Err(OpenIAPError::ServerError(
3964                                "Error with no message".to_string(),
3965                            ));
3966                        }
3967                        return Err(OpenIAPError::ServerError(data));
3968                    }
3969                }
3970                // let response = self.unregister_queue(&q).await;
3971                // match response {
3972                //     Ok(_) => {
3973                //         debug!("Unregistered Response Queue: {:?}", q);
3974                //     }
3975                //     Err(e) => {
3976                //         error!("Failed to unregister Response Queue: {:?}", e);
3977                //     }
3978                // }
3979                Ok(data)
3980            }
3981            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3982        }
3983    }
3984}
3985