openiap_client/
lib.rs

1#![warn(missing_docs)]
2//! The `openiap.client` crate provides the [Client] struct and its methods.
3//! For now this is only the GRPC and WebSocket client, later we will add a web rest, named pipe and TCP client as well.
4//! Initialize a new client, by calling the [Client::new_connect] method.
5//! ```
6//! use openiap_client::{OpenIAPError, Client, QueryRequest};
7//! #[tokio::main]
8//! async fn main() -> Result<(), OpenIAPError> {
9//!     let client = Client::new_connect("").await?;
10//!     let q = client.query( QueryRequest::with_projection(
11//!         "entities",
12//!         "{}",
13//!         "{\"name\":1}"
14//!     )).await?;
15//!     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
16//!     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
17//!     for item in items {
18//!         println!("Item: {:?}", item);
19//!     }
20//!     Ok(())
21//! }
22//! ```
23
24pub use openiap_proto::errors::*;
25pub use openiap_proto::protos::*;
26pub use openiap_proto::*;
27pub use prost_types::Timestamp;
28pub use protos::flow_service_client::FlowServiceClient;
29use sqids::Sqids;
30
31use tokio::task::JoinHandle;
32use tokio_tungstenite::{WebSocketStream};
33use tracing::{debug, error, info, trace};
34type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
35type Result<T, E = StdError> = ::std::result::Result<T, E>;
36use std::fs::File;
37use std::io::{Read, Write};
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Arc;
40use tokio::sync::Mutex;
41use tonic::transport::Channel;
42
43use tokio::sync::{mpsc, oneshot};
44
45use std::env;
46use std::time::Duration;
47
48#[cfg(feature = "otel")]
49mod otel;
50mod tests;
51mod ws;
52mod grpc;
53mod util;
54pub use crate::util::{enable_tracing, disable_tracing};
55
56type QuerySender = oneshot::Sender<Envelope>;
57type StreamSender = mpsc::Sender<Vec<u8>>;
58type Sock = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
59use futures::{StreamExt };
60use async_channel::{unbounded};
61const VERSION: &str = "0.0.20";
62
63
64/// The `Client` struct provides the client for the OpenIAP service.
65/// Initialize a new client, by calling the [Client::new_connect] method.
66#[derive(Clone)]
67pub struct Client {
68    /// Ensure we use only call connect once, and then use re-connect instead.
69    connect_called: Arc<std::sync::Mutex<bool>>,
70    /// The tokio runtime.
71    runtime: Arc<std::sync::Mutex<Option<tokio::runtime::Runtime>>>,
72
73    /// Keep track of usage of the client
74    stats: Arc<std::sync::Mutex<ClientStatistics>>,
75
76    task_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
77    /// The inner client object
78    pub client: Arc<std::sync::Mutex<ClientEnum>>,
79    /// The signed in user.
80    user: Arc<std::sync::Mutex<Option<User>>>,
81    /// The inner client.
82    pub inner: Arc<Mutex<ClientInner>>,
83    /// The `Config` struct provides the configuration for the OpenIAP service we are connecting to.
84    pub config: Arc<std::sync::Mutex<Option<Config>>>,
85    /// Should client automatically reconnect, if disconnected?
86    pub auto_reconnect: Arc<std::sync::Mutex<bool>>,
87    /// URL used to connect to server, processed and without credentials
88    pub url: Arc<std::sync::Mutex<String>>,
89    /// Username used to connect to server
90    pub username: Arc<std::sync::Mutex<String>>,
91    /// Password used to connect to server
92    pub password: Arc<std::sync::Mutex<String>>,
93    /// JWT token used to connect to server
94    pub jwt: Arc<std::sync::Mutex<String>>,
95    agent_name: Arc<std::sync::Mutex<String>>,
96    agent_version: Arc<std::sync::Mutex<String>>,
97    event_sender: async_channel::Sender<ClientEvent>,
98    event_receiver: async_channel::Receiver<ClientEvent>,
99    out_envelope_sender: async_channel::Sender<Envelope>,
100    out_envelope_receiver: async_channel::Receiver<Envelope>,
101    /// The client connection state.
102    pub state: Arc<std::sync::Mutex<ClientState>>,
103    /// Inceasing message count, used as unique id for messages.
104    pub msgcount: Arc<std::sync::Mutex<i32>>,
105    /// Reconnect interval in milliseconds, this will slowly increase if we keep getting disconnected.
106    pub reconnect_ms: Arc<std::sync::Mutex<i32>>,
107    
108}
109/// The `ClientStatistics` struct provides the statistics for usage of the client
110#[derive(Clone, Default)]
111pub struct ClientStatistics {
112    connection_attempts: u64,
113    connections: u64,
114    package_tx: u64,
115    package_rx: u64,
116    signin: u64,
117    download: u64,
118    getdocumentversion: u64,
119    customcommand: u64,
120    listcollections: u64,
121    createcollection: u64,
122    dropcollection: u64,
123    ensurecustomer: u64,
124    invokeopenrpa: u64,
125    registerqueue: u64,
126    registerexchange: u64,
127    unregisterqueue: u64,
128    watch: u64,
129    unwatch: u64,
130    queuemessage: u64,
131    pushworkitem: u64,
132    pushworkitems: u64,
133    popworkitem: u64,
134    updateworkitem: u64,
135    deleteworkitem: u64,
136    addworkitemqueue: u64,
137    updateworkitemqueue: u64,
138    deleteworkitemqueue: u64,
139    getindexes: u64,
140    createindex: u64,
141    dropindex: u64,
142    upload: u64,
143    query: u64,
144    count: u64,
145    distinct: u64,
146    aggregate: u64,
147    insertone: u64,
148    insertmany: u64,
149    insertorupdateone: u64,
150    insertorupdatemany: u64,
151    updateone: u64,
152    updatedocument: u64,
153    deleteone: u64,
154    deletemany: u64,
155}
156/// The `ClientInner` struct provides the inner client for the OpenIAP service.
157#[derive(Clone)]
158pub struct ClientInner {
159    /// list of queries ( messages sent to server we are waiting on a response for )
160    pub queries: Arc<Mutex<std::collections::HashMap<String, QuerySender>>>,
161    /// Active streams the server (or client) has opened
162    pub streams: Arc<Mutex<std::collections::HashMap<String, StreamSender>>>,
163    /// List of active watches ( change streams )
164    #[allow(clippy::type_complexity)]
165    pub watches:
166        Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(WatchEvent) + Send + Sync>>>>,
167    /// List of active queues ( message queues / mqqt queues or exchanges )
168    #[allow(clippy::type_complexity)]
169    pub queues:
170        Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(QueueEvent) + Send + Sync>>>>,
171}
172/// Client enum, used to determine which client to use.
173#[derive(Clone, Debug)]
174pub enum ClientEnum {
175    /// Not set yet
176    None,
177    /// Used when client wants to connect using gRPC 
178    Grpc(FlowServiceClient<tonic::transport::Channel>),
179    /// Used when client wants to connect using websockets
180    WS(Arc<Mutex<Sock>>)
181}
182/// Client event enum, used to determine which event has occurred.
183#[derive(Debug, Clone, PartialEq)]
184pub enum ClientEvent {
185    /// The client has connected
186    Connecting,
187    /// The client has connected
188    Connected,
189    /// The client has disconnected
190    Disconnected(String),
191    /// The client has signed in
192    SignedIn,
193    // The client has signed out
194    // SignedOut,
195    // The client has received a message
196    // Message(Envelope),
197    // The client has received a ping event from the server
198    // Ping,
199    // The client has received a stream
200    // Stream(Vec<u8>),
201    // The client has received a watch event
202    // Watch(WatchEvent),
203    // The client has received a queue event
204    // Queue(QueueEvent),
205}
206/// Client event enum, used to determine which event has occurred.
207#[derive(Debug, Clone, PartialEq)]
208pub enum ClientState {
209    /// The client is disconnected
210    Disconnected,
211    /// The client connecting
212    Connecting,
213    /// The client is connected
214    Connected,
215    /// The client is signed in and connected
216    Signedin
217}
218/// The `Config` struct provides the configuration for the OpenIAP service we are connecting to.
219#[derive(Debug, Clone, serde::Deserialize)]
220#[allow(dead_code)]
221pub struct Config {
222    #[serde(default)]
223    wshost: String,
224    #[serde(default)]
225    wsurl: String,
226    #[serde(default)]
227    domain: String,
228    #[serde(default)]
229    auto_create_users: bool,
230    #[serde(default)]
231    namespace: String,
232    #[serde(default)]
233    agent_domain_schema: String,
234    #[serde(default)]
235    version: String,
236    #[serde(default)]
237    validate_emails: bool,
238    #[serde(default)]
239    forgot_pass_emails: bool,
240    #[serde(default)]
241    supports_watch: bool,
242    #[serde(default)]
243    amqp_enabled_exchange: bool,
244    #[serde(default)]
245    multi_tenant: bool,
246    #[serde(default)]
247    enable_entity_restriction: bool,
248    #[serde(default)]
249    enable_web_tours: bool,
250    #[serde(default)]
251    collections_with_text_index: Vec<String>,
252    #[serde(default)]
253    timeseries_collections: Vec<String>,
254    #[serde(default)]
255    ping_clients_interval: i32,
256    #[serde(default)]
257    validlicense: bool,
258    #[serde(default)]
259    forceddomains: Vec<String>,
260    #[serde(default)]
261    grafana_url: String,
262    #[serde(default)]
263    otel_metric_url: String,
264    #[serde(default)]
265    enable_analytics: bool,
266}
267impl std::fmt::Debug for ClientInner {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        f.debug_struct("ClientInner")
270            // .field("client", &self.client)
271            .field("queries", &self.queries)
272            .field("streams", &self.streams)
273            .finish()
274    }
275}
276impl Default for Client {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282impl Client {
283    /// Create a new client.
284    pub fn new() -> Self {
285        let (ces, cer) = unbounded::<ClientEvent>();
286        let (out_es, out_er) = unbounded::<Envelope>();
287        Self {
288            task_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
289            stats: Arc::new(std::sync::Mutex::new(ClientStatistics::default())),
290            user: Arc::new(std::sync::Mutex::new(None)),
291            client: Arc::new(std::sync::Mutex::new(ClientEnum::None)),
292            connect_called: Arc::new(std::sync::Mutex::new(false)),
293            runtime: Arc::new(std::sync::Mutex::new(None)),
294            msgcount: Arc::new(std::sync::Mutex::new(-1)),
295            reconnect_ms: Arc::new(std::sync::Mutex::new(1000)),
296            inner: Arc::new(Mutex::new(ClientInner {
297                queries: Arc::new(Mutex::new(std::collections::HashMap::new())),
298                streams: Arc::new(Mutex::new(std::collections::HashMap::new())),
299                watches: Arc::new(Mutex::new(std::collections::HashMap::new())),
300                queues: Arc::new(Mutex::new(std::collections::HashMap::new())),
301            })),
302            config: Arc::new(std::sync::Mutex::new(None)),
303            auto_reconnect: Arc::new(std::sync::Mutex::new(true)),
304            url: Arc::new(std::sync::Mutex::new("".to_string())),
305            username: Arc::new(std::sync::Mutex::new("".to_string())),
306            password: Arc::new(std::sync::Mutex::new("".to_string())),
307            jwt: Arc::new(std::sync::Mutex::new("".to_string())),
308            agent_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
309            agent_version: Arc::new(std::sync::Mutex::new(VERSION.to_string())),
310            event_sender: ces,
311            event_receiver: cer,
312            out_envelope_sender: out_es,
313            out_envelope_receiver: out_er,
314            state: Arc::new(std::sync::Mutex::new(ClientState::Disconnected)),
315        }
316    }
317    /// Connect the client to the OpenIAP server.
318    #[tracing::instrument(skip_all)]
319    pub fn connect(&self, dst: &str) -> Result<(), OpenIAPError> {
320        let rt = match tokio::runtime::Runtime::new() {
321            Ok(rt) => rt,
322            Err(e) => {
323                return Err(OpenIAPError::ClientError(format!(
324                    "Failed to create tokio runtime: {}",
325                    e
326                )));
327            }
328        };
329        self.set_runtime(Some(rt));
330        tokio::task::block_in_place(|| {
331            let handle = self.get_runtime_handle();
332            handle.block_on(self.connect_async(dst))
333        })
334    }
335
336    /// Load the configuration from the server.
337    #[allow(unused_variables)]
338    pub async fn load_config(&self, strurl: &str, url: &url::Url) -> Option<Config> {
339        let config: Option<Config>;
340        let issecure = url.scheme() == "https" || url.scheme() == "wss" || url.port() == Some(443);
341        let mut port = url.port().unwrap_or(80);
342        if port == 50051 {
343            port = 3000;
344        }
345        let configurl = if issecure {
346            format!(
347                "{}://{}:{}/config",
348                "https",
349                url.host_str()
350                    .unwrap_or("localhost.openiap.io")
351                    .replace("grpc.", ""),
352                port
353            )
354        } else {
355            format!(
356                "{}://{}:{}/config",
357                "http",
358                url.host_str()
359                    .unwrap_or("localhost.openiap.io")
360                    .replace("grpc.", ""),
361                port
362            )
363        };
364
365        let configurl = url::Url::parse(configurl.as_str())
366            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e))).expect("wefew");
367        let o = minreq::get(configurl).send();
368        match o {
369            Ok(_) => {
370                let response = match o {
371                    Ok(response) => response,
372                    Err(e) => {
373                        error!("Failed to get config: {}", e);
374                        return None;
375                    }
376                };
377                if response.status_code == 200 {
378                    let body = response.as_str().unwrap();
379                    config = Some(match serde_json::from_str(body) {
380                        Ok(config) => config,
381                        Err(e) => {
382                            error!("Failed to parse config: {}", e);
383                            return None;
384                        }
385                    });
386                } else {
387                    config = None;
388                }
389            }
390            Err(e) => {
391                error!("Failed to get config: {}", e);
392                return None;
393            }
394        }
395        let mut _enable_analytics = true;
396        let mut _otel_metric_url = std::env::var("OTEL_METRIC_URL").unwrap_or_default();
397        let mut apihostname = url.host_str().unwrap_or("localhost.openiap.io").to_string();
398        if apihostname.starts_with("grpc.") {
399            apihostname = apihostname[5..].to_string();
400        }
401    
402        if config.is_some() {
403            let config = config.as_ref().unwrap();
404            if !config.otel_metric_url.is_empty() {
405                _otel_metric_url = config.otel_metric_url.clone();
406            }
407            if !config.domain.is_empty() {
408                apihostname = config.domain.clone();
409            }
410            _enable_analytics = config.enable_analytics;
411        }
412        #[cfg(feature = "otel")]
413        if _enable_analytics {
414            let agent_name = self.get_agent_name();
415            let agent_version = self.get_agent_version();
416            match otel::init_telemetry(&agent_name, &agent_version, VERSION, &apihostname, _otel_metric_url.as_str(), &self.stats) {
417                Ok(_) => (),
418                Err(e) => {
419                    error!("Failed to initialize telemetry: {}", e);
420                    return None;
421                }
422            }
423        }
424        config
425    }
426
427    /// Connect the client to the OpenIAP server.
428    #[tracing::instrument(skip_all)]
429    pub async fn connect_async(&self, dst: &str) -> Result<(), OpenIAPError> {
430        #[cfg(test)]
431        {   
432            // enable_tracing("openiap=trace", "new");
433            // enable_tracing("openiap=debug", "new");
434            // enable_tracing("trace", "");
435            enable_tracing("openiap=error", "");
436            // enable_tracing("openiap=debug", "");
437        }
438        if self.is_connect_called() {
439            self.set_auto_reconnect(true);
440            return self.reconnect().await;
441        }
442        let mut strurl = dst.to_string();
443        if strurl.is_empty() {
444            strurl = std::env::var("apiurl").unwrap_or("".to_string());
445            if strurl.is_empty() {
446                strurl = std::env::var("grpcapiurl").unwrap_or("".to_string());
447            }
448            if strurl.is_empty() {
449                strurl = std::env::var("wsapiurl").unwrap_or("".to_string());
450            }
451        }
452        if strurl.is_empty() {
453            return Err(OpenIAPError::ClientError("No URL provided".to_string()));
454        }
455        let url = url::Url::parse(strurl.as_str())
456            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
457        let usegprc = url.scheme() == "grpc" || url.domain().unwrap_or("localhost").to_lowercase().starts_with("grpc.") || url.port() == Some(50051);
458        if url.scheme() != "http"
459            && url.scheme() != "https"
460            && url.scheme() != "grpc"
461            && url.scheme() != "ws"
462            && url.scheme() != "wss"
463        {
464            return Err(OpenIAPError::ClientError("Invalid URL scheme".to_string()));
465        }
466        if url.scheme() == "grpc" {
467            if url.port() == Some(443) {
468                strurl = format!("https://{}", url.host_str().unwrap_or("app.openiap.io"));
469            } else { 
470                strurl = format!("http://{}:{}", url.host_str().unwrap_or("app.openiap.io"), url.port().unwrap_or(80));
471            }
472        }
473        let mut url = url::Url::parse(strurl.as_str())
474            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
475        let mut username = "".to_string();
476        let mut password = "".to_string();
477        if let Some(p) = url.password() {
478            password = p.to_string();
479        }
480        if !url.username().is_empty() {
481            username = url.username().to_string();
482        }
483        if !username.is_empty() && !password.is_empty() {
484            self.set_username(&username);
485            self.set_password(&password);
486        }
487        url = url::Url::parse(strurl.as_str())
488            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
489
490        if url.port().is_none() {
491            strurl = format!(
492                "{}://{}",
493                url.scheme(),
494                url.host_str().unwrap_or("app.openiap.io")
495            );
496        } else {
497            strurl = format!(
498                "{}://{}:{}",
499                url.scheme(),
500                url.host_str().unwrap_or("localhost.openiap.io"),
501                url.port().unwrap_or(80)
502            );
503        }
504        info!("Connecting to {}", strurl);
505        let config = self.load_config(strurl.as_str(), &url).await;
506        if !usegprc {
507            strurl = format!("{}/ws/v2", strurl);
508
509            let (_stream_tx, stream_rx) = mpsc::channel(60);
510
511            let socket = match tokio_tungstenite::connect_async(strurl.clone()).await {
512                Ok((socket, _)) => socket,
513                Err(e) => {
514                    return Err(OpenIAPError::ClientError(format!(
515                        "Failed to connect to WS: {}",
516                        e
517                    )));
518                }
519            };
520            self.set_client(ClientEnum::WS(Arc::new(Mutex::new(socket))));
521            self.set_connect_called(true);
522            self.set_config(config);
523            self.set_url(&strurl);
524            match self.setup_ws(&strurl).await {
525                Ok(_) => (),
526                Err(e) => {
527                    return Err(OpenIAPError::ClientError(format!(
528                        "Failed to setup WS: {}",
529                        e
530                    )));
531                }
532            }
533            let client2 = self.clone();
534            // tokio::task::Builder::new().name("Old Websocket receiver").spawn(async move {
535            tokio::task::spawn(async move {
536                tokio_stream::wrappers::ReceiverStream::new(stream_rx)
537                    .for_each(|envelope: Envelope| async {
538                        let command = envelope.command.clone();
539                        let rid = envelope.rid.clone();
540                        let id = envelope.id.clone();
541                        trace!("Received command: {}, id: {}, rid: {}", command, id, rid);
542                        client2.parse_incomming_envelope(envelope).await;
543                    })
544                    .await;
545            }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn Old Websocket receiver task: {:?}", e)))?;
546        } else {
547            if url.scheme() == "http" {
548                let response = Client::connect_grpc(strurl.clone()).await;
549                match response {
550                    Ok(client) => {
551                        self.set_client(ClientEnum::Grpc(client));
552                    }
553                    Err(e) => {
554                        return Err(OpenIAPError::ClientError(format!(
555                            "Failed to connect: {}",
556                            e
557                        )));
558                    }
559                }
560            } else {
561                let uri = tonic::transport::Uri::builder()
562                    .scheme(url.scheme())
563                    .authority(url.host().unwrap().to_string())
564                    .path_and_query("/")
565                    .build();
566                let uri = match uri {
567                    Ok(uri) => uri,
568                    Err(e) => {
569                        return Err(OpenIAPError::ClientError(format!(
570                            "Failed to build URI: {}",
571                            e
572                        )));
573                    }
574                };
575                let channel = Channel::builder(uri)
576                    .tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
577                let channel = match channel {
578                    Ok(channel) => channel,
579                    Err(e) => {
580                        return Err(OpenIAPError::ClientError(format!(
581                            "Failed to build channel: {}",
582                            e
583                        )));
584                    }
585                };
586                let channel = channel.connect().await;
587                let channel = match channel {
588                    Ok(channel) => channel,
589                    Err(e) => {
590                        return Err(OpenIAPError::ClientError(format!(
591                            "Failed to connect: {}",
592                            e
593                        )));
594                    }
595                };
596                self.set_client(ClientEnum::Grpc(FlowServiceClient::new(channel)));
597            }
598            self.set_connect_called(true);
599            self.set_config(config);
600            self.set_url(&strurl);
601            self.setup_grpc_stream().await?;
602        };
603        self.post_connected().await
604    }
605
606    /// Connect will initializes a new client and starts a connection to an OpenIAP server.\
607    /// Use "" to autodetect the server from the environment variables (apiurl or grpcapiurl), or provide a URL.\
608    /// \
609    /// You can add username and password, to login using local provider, or set them using OPENIAP_USERNAME and OPENIAP_PASSWORD environment variables.
610    /// It is highly recommended to not user username and password, but instead use a JWT token, set using the OPENIAP_JWT (or jwt) environment variable.
611    /// You can use the openiap vs.code extension to manage this, if you need to generate one your self, login to the OpenIAP server and then open the /jwtlong page.
612    /// If credentials are not provided, the client will run as guest.\
613    /// If credentials are found, it will call [Client::signin] after successfully connecting to the server.
614    /// 
615    /// To troubleshoot issues, call [enable_tracing].
616    /// ```
617    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
618    /// #[tokio::main]
619    /// async fn main() -> Result<(), OpenIAPError> {
620    ///     let client = Client::new_connect("").await?;
621    ///     let q = client.query( QueryRequest::with_projection(
622    ///         "entities",
623    ///         "{}",
624    ///         "{\"name\":1}"
625    ///     )).await?;
626    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
627    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
628    ///     for item in items {
629    ///         println!("Item: {:?}", item);
630    ///     }
631    ///     Ok(())
632    /// }
633    /// ```
634    #[tracing::instrument(skip_all)]
635    pub async fn new_connect(dst: &str) -> Result<Self, OpenIAPError> {
636        #[cfg(test)]
637        {   
638            // enable_tracing("openiap=trace", "new");
639            // enable_tracing("openiap=debug", "new");
640            // enable_tracing("trace", "");
641            enable_tracing("openiap=error", "");
642            // enable_tracing("openiap=debug", "");
643        }
644        let client  = Client::new();
645        client.connect_async(dst).await?;
646        Ok(client)
647    }
648    /// Handle auto-signin after a connection has been established.
649    pub async fn post_connected(&self) -> Result<(), OpenIAPError> {
650        if self.get_username().is_empty() && self.get_password().is_empty() {
651            self.set_username(&std::env::var("OPENIAP_USERNAME").unwrap_or_default());
652            self.set_password(&std::env::var("OPENIAP_PASSWORD").unwrap_or_default());
653        }
654        if !self.get_username().is_empty() && !self.get_password().is_empty() {
655            debug!("Signing in with username: {}", self.get_username());
656            let signin = SigninRequest::with_userpass(self.get_username().as_str(), self.get_password().as_str());
657            let loginresponse = self.signin(signin).await;
658            match loginresponse {
659                Ok(response) => {
660                    self.reset_reconnect_ms();
661                    self.set_connected(ClientState::Connected, None);
662                    info!("Signed in as {}", response.user.as_ref().unwrap().username);
663                    Ok(())
664                }
665                Err(_e) => {
666                    self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
667                    Err(_e)
668                }
669            }
670        } else {
671            self.set_jwt(&std::env::var("OPENIAP_JWT").unwrap_or_default());
672            if self.get_jwt().is_empty() {
673                self.set_jwt(&std::env::var("jwt").unwrap_or_default());
674            }
675            if !self.get_jwt().is_empty() {
676                debug!("Signing in with JWT");
677                let signin = SigninRequest::with_jwt(self.get_jwt().as_str());
678                let loginresponse = self.signin(signin).await;
679                match loginresponse {
680                    Ok(response) => match response.user {
681                        Some(user) => {
682                            self.reset_reconnect_ms();
683                            info!("Signed in as {}", user.username);
684                            self.set_connected(ClientState::Connected, None);
685                            Ok(())
686                        }
687                        None => {
688                            self.reset_reconnect_ms();
689                            info!("Signed in as guest");
690                            self.set_connected(ClientState::Connected, None);
691                            Ok(())
692                            // Err(OpenIAPError::ClientError("Signin returned no user object".to_string()))
693                        }
694                    },
695                    Err(_e) => {
696                        self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
697                        Err(_e)
698                    }
699                }
700            } else {
701                self.reset_reconnect_ms();
702                match self.get_element().await {
703                    Ok(_) => {
704                        debug!("Connected, No credentials provided so is running as guest");
705                        self.set_connected(ClientState::Connected, None);
706                        Ok(())
707                    },
708                    Err(e) => {
709                        self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
710                        Err(e)
711                    }
712                }
713            }
714        }
715    }
716    /// Reconnect will attempt to reconnect to the OpenIAP server.
717    #[tracing::instrument(skip_all)]
718    pub async fn reconnect(&self) -> Result<(), OpenIAPError> {
719        let state = self.get_state();
720        if state == ClientState::Connected || state == ClientState::Signedin {
721            return Ok(());
722        }
723        if !self.is_auto_reconnect() {
724            return Ok(());   
725        }
726        let client = self.get_client();
727    
728        match client {
729            ClientEnum::WS(ref _client) => {
730                info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
731                self.setup_ws(&self.get_url()).await?;
732                debug!("Completed reconnecting to websocket");
733                self.post_connected().await
734            }
735            ClientEnum::Grpc(ref _client) => {
736                info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
737                match self.setup_grpc_stream().await {
738                    Ok(_) => {
739                        debug!("Completed reconnecting to gRPC");
740                        self.post_connected().await
741                    },
742                    Err(e) => {
743                        return Err(OpenIAPError::ClientError(format!(
744                            "Failed to setup gRPC stream: {}",
745                            e
746                        )));
747                    }
748                }
749            }
750            ClientEnum::None => {
751                return Err(OpenIAPError::ClientError("Invalid client".to_string()));
752            }
753        }
754    }
755    /// Disconnect the client from the OpenIAP server.
756    pub fn disconnect(&self) {
757        self.set_auto_reconnect(false);
758        self.set_connected(ClientState::Disconnected, Some("Disconnected"));
759    }
760    /// Set the connected flag to true or false
761    pub fn set_connected(&self, state: ClientState, message: Option<&str>) {
762        {
763            let current = self.get_state();
764            trace!("Set connected: {:?} from {:?}", state, current);
765            if state == ClientState::Connected && current == ClientState::Signedin {
766                self.set_state(ClientState::Signedin);
767            } else {
768                self.set_state(state.clone());
769            }            
770            if state == ClientState::Connecting && !current.eq(&state) {
771                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
772                    self.stats.lock().unwrap().connection_attempts += 1;
773                    let me = self.clone();
774                    tokio::task::spawn(async move {
775                        me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
776                    });
777                    // match tokio::task::Builder::new().name("setconnected-notify-connecting").spawn(async move {
778                    //     me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
779                    // }) {
780                    //     Ok(_) => (),
781                    //     Err(e) => {
782                    //         error!("Failed to spawn setconnected-notify-connecting task: {:?}", e);
783                    //     }
784                    // }
785                }
786
787            }
788            if (state == ClientState::Connected|| state == ClientState::Signedin) && (current == ClientState::Disconnected || current == ClientState::Connecting) { 
789                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
790                    self.stats.lock().unwrap().connections += 1;
791                    let me = self.clone();
792                    tokio::task::spawn(async move {
793                        me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
794                    });
795                    // match tokio::task::Builder::new().name("setconnected-notify-connected").spawn(async move {
796                    //     me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
797                    // }) {
798                    //     Ok(_) => (),
799                    //     Err(e) => {
800                    //         error!("Failed to spawn setconnected-notify-connected task: {:?}", e);
801                    //     }
802                    // }
803                }
804            }
805            if state == ClientState::Signedin && current != ClientState::Signedin {
806                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
807                    let me = self.clone();
808                    tokio::task::spawn(async move {
809                        me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
810                    });
811                    // match tokio::task::Builder::new().name("set_signedin-notify-connected").spawn(async move {
812                    //     me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
813                    // }) {
814                    //     Ok(_) => (),
815                    //     Err(e) => {
816                    //         error!("Failed to spawn set_signedin-notify-connected task: {:?}", e);
817                    //     }
818                    // };
819                }
820            }
821            if state == ClientState::Disconnected && !current.eq(&state) {
822                if message.is_some() {
823                    info!("Disconnected: {}", message.unwrap());
824                } else {
825                    info!("Disconnected");
826                }
827                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
828                    let me = self.clone();
829                    let message = match message {
830                        Some(message) => message.to_string(),
831                        None => "".to_string(),
832                    };
833                    //if current != ClientState::Connecting {
834                        tokio::task::spawn(async move {
835                            me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
836                        });
837                        // match tokio::task::Builder::new().name("setconnected-notify-disconnected").spawn(async move {
838                        //     trace!("Disconnected: {}", message);
839                        //     me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
840                        // }) {
841                        //     Ok(_) => (),
842                        //     Err(e) => {
843                        //         error!("Failed to spawn setconnected-notify-disconnected task: {:?}", e);
844                        //     }
845                        // }
846                    //}
847                }
848
849                self.kill_handles();
850                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
851                    let client = self.clone();
852                    // match tokio::task::Builder::new().name("kill_handles").spawn(async move {
853                        tokio::task::spawn(async move {
854                        {
855                            let inner = client.inner.lock().await;
856                            let mut queries = inner.queries.lock().await;
857                            let ids = queries.keys().cloned().collect::<Vec<String>>();
858                            debug!("********************************************** Cleaning up");
859                            for id in ids {
860                                let err = ErrorResponse {
861                                    code: 500,
862                                    message: "Disconnected".to_string(),
863                                    stack: "".to_string(),
864                                };
865                                let envelope = err.to_envelope();
866                                let tx = queries.remove(&id).unwrap();
867                                debug!("kill query: {}", id);
868                                let _ = tx.send(envelope);
869                            }
870                            let mut streams = inner.streams.lock().await;
871                            let ids = streams.keys().cloned().collect::<Vec<String>>();
872                            for id in ids {
873                                let tx = streams.remove(&id).unwrap();
874                                debug!("kill stream: {}", id);
875                                let _ = tx.send(Vec::new()).await;
876                            }
877                            let mut queues = inner.queues.lock().await;
878                            let ids = queues.keys().cloned().collect::<Vec<String>>();
879                            for id in ids {
880                                let _ = queues.remove(&id).unwrap();
881                            }
882                            let mut watches = inner.watches.lock().await;
883                            let ids = watches.keys().cloned().collect::<Vec<String>>();
884                            for id in ids {
885                                let _ = watches.remove(&id).unwrap();
886                            }
887                            debug!("**********************************************************");
888                        }
889                        if client.is_auto_reconnect() {
890                            trace!("Reconnecting in {} seconds", client.get_reconnect_ms() / 1000);
891                            tokio::time::sleep(Duration::from_millis(client.get_reconnect_ms() as u64)).await;
892                            if client.is_auto_reconnect() {
893                                client.inc_reconnect_ms();
894                                // let mut client = client.clone();
895                                trace!("Reconnecting . . .");
896                                client.reconnect().await.unwrap_or_else(|e| {
897                                    error!("Failed to reconnect: {}", e);
898                                    client.set_connected(ClientState::Disconnected, Some(&e.to_string()));
899                                });
900                            } else {
901                                debug!("Not reconnecting");
902                            }
903                        } else {
904                            debug!("Reconnecting disabled, stop now");
905                        }
906                    });
907                    //  {
908                    //     Ok(_) => (),
909                    //     Err(e) => {
910                    //         error!("Failed to spawn kill_handles task: {:?}", e);
911                    //     }
912                    // }
913                }
914    
915            }
916        }
917    }
918    /// Get client state
919    pub fn get_state(&self) -> ClientState {
920        let conn = self.state.lock().unwrap();
921        conn.clone()
922    }
923    /// Set client state
924    pub fn set_state(&self, state: ClientState) {
925        let mut conn = self.state.lock().unwrap();
926        *conn = state;
927    }
928    /// Set the msgcount value
929    pub fn set_msgcount(&self, msgcount: i32) {
930        let mut current = self.msgcount.lock().unwrap();
931        trace!("Set msgcount: {} from {}", msgcount, *current);
932        *current = msgcount;
933    }
934    /// Increment the msgcount value
935    pub fn inc_msgcount(&self) -> i32 {
936        let mut current = self.msgcount.lock().unwrap();
937        *current += 1;
938        *current
939    }
940    /// Return value of reconnect_ms
941    pub fn get_reconnect_ms(&self) -> i32 {
942        let reconnect_ms = self.reconnect_ms.lock().unwrap();
943        *reconnect_ms
944    }
945    /// Increment the reconnect_ms value
946    pub fn reset_reconnect_ms(&self) {
947        let mut current = self.reconnect_ms.lock().unwrap();
948        *current = 500;
949    }
950    /// Increment the reconnect_ms value
951    pub fn inc_reconnect_ms(&self) -> i32 {
952        let mut current = self.reconnect_ms.lock().unwrap();
953        if *current < 30000 {
954            *current += 500;
955        }
956        *current
957    }
958    
959    /// Push tokio task handle to the task_handles vector
960    pub fn push_handle(&self, handle: tokio::task::JoinHandle<()>) {
961        let mut handles = self.task_handles.lock().unwrap();
962        handles.push(handle);
963    }
964    /// Kill all tokio task handles in the task_handles vector
965    pub fn kill_handles(&self) {
966        let mut handles = self.task_handles.lock().unwrap();
967        for handle in handles.iter() {
968            // let id = handle.id();
969            // debug!("Killing handle {}", id);
970            debug!("Killing handle");
971            if !handle.is_finished() {
972                handle.abort();
973            }
974        }
975        handles.clear();
976        // if let Ok(_handle) = tokio::runtime::Handle::try_current() {
977        //     let runtime = self.get_runtime();
978        //     if let Some(rt) = runtime.lock().unwrap().take() {
979        //         rt.shutdown_background();
980        //     }
981        // }
982    }
983
984
985    /// Return value of the msgcount flag
986    #[tracing::instrument(skip_all)]
987    fn get_msgcount(&self) -> i32 {
988        let msgcount = self.msgcount.lock().unwrap();
989        *msgcount
990    }
991
992    /// Set the connect_called flag to true or false
993    #[tracing::instrument(skip_all)]
994    pub fn set_connect_called(&self, connect_called: bool) {
995        let mut current = self.connect_called.lock().unwrap();
996        trace!("Set connect_called: {} from {}", connect_called, *current);
997        *current = connect_called;
998    }
999    /// Return value of the connect_called flag
1000    #[tracing::instrument(skip_all)]
1001    fn is_connect_called(&self) -> bool {
1002        let connect_called = self.connect_called.lock().unwrap();
1003        *connect_called
1004    }
1005    /// Set the auto_reconnect flag to true or false
1006    #[tracing::instrument(skip_all)]
1007    pub fn set_auto_reconnect(&self, auto_reconnect: bool) {
1008        let mut current = self.auto_reconnect.lock().unwrap();
1009        trace!("Set auto_reconnect: {} from {}", auto_reconnect, *current);
1010        *current = auto_reconnect;
1011    }
1012    /// Return value of the auto_reconnect flag
1013    #[tracing::instrument(skip_all)]
1014    fn is_auto_reconnect(&self) -> bool {
1015        let auto_reconnect = self.auto_reconnect.lock().unwrap();
1016        *auto_reconnect
1017    }
1018    /// Set the url flag to true or false
1019    #[tracing::instrument(skip_all)]
1020    pub fn set_url(&self, url: &str) {
1021        let mut current = self.url.lock().unwrap();
1022        trace!("Set url: {} from {}", url, *current);
1023        *current = url.to_string();
1024    }
1025    /// Return value of the url string
1026    #[tracing::instrument(skip_all)]
1027    fn get_url(&self) -> String {
1028        let url = self.url.lock().unwrap();
1029        url.to_string()
1030    }
1031    /// Set the username flag to true or false
1032    #[tracing::instrument(skip_all)]
1033    pub fn set_username(&self, username: &str) {
1034        let mut current = self.username.lock().unwrap();
1035        trace!("Set username: {} from {}", username, *current);
1036        *current = username.to_string();
1037    }
1038    /// Return value of the username string
1039    #[tracing::instrument(skip_all)]
1040    fn get_username(&self) -> String {
1041        let username = self.username.lock().unwrap();
1042        username.to_string()
1043    }
1044    /// Set the password value
1045    #[tracing::instrument(skip_all)]
1046    pub fn set_password(&self, password: &str) {
1047        let mut current = self.password.lock().unwrap();
1048        trace!("Set password: {} from {}", password, *current);
1049        *current = password.to_string();
1050    }
1051    /// Return value of the password string
1052    #[tracing::instrument(skip_all)]
1053    fn get_password(&self) -> String {
1054        let password = self.password.lock().unwrap();
1055        password.to_string()
1056    }
1057    /// Set the jwt flag to true or false
1058    #[tracing::instrument(skip_all)]
1059    pub fn set_jwt(&self, jwt: &str) {
1060        let mut current = self.jwt.lock().unwrap();
1061        trace!("Set jwt: {} from {}", jwt, *current);
1062        *current = jwt.to_string();
1063    }
1064    /// Return value of the jwt string
1065    #[tracing::instrument(skip_all)]
1066    fn get_jwt(&self) -> String {
1067        let jwt = self.jwt.lock().unwrap();
1068        jwt.to_string()
1069    }
1070    /// Set the agent flag to true or false
1071    #[tracing::instrument(skip_all)]
1072    pub fn set_agent_name(&self, agent: &str) {
1073        let mut current = self.agent_name.lock().unwrap();
1074        trace!("Set agent: {} from {}", agent, *current);
1075        *current = agent.to_string();
1076    }
1077    /// Return value of the agent string
1078    #[tracing::instrument(skip_all)]
1079    pub fn get_agent_name(&self) -> String {
1080        let agent = self.agent_name.lock().unwrap();
1081        agent.to_string()
1082    }
1083    /// Set the agent version number
1084    #[tracing::instrument(skip_all)]
1085    pub fn set_agent_version(&self, version: &str) {
1086        let mut current = self.agent_version.lock().unwrap();
1087        trace!("Set agent version: {} from {}", version, *current);
1088        *current = version.to_string();
1089    }
1090    /// Return value of the agent version string
1091    #[tracing::instrument(skip_all)]
1092    pub fn get_agent_version(&self) -> String {
1093        let agent_version = self.agent_version.lock().unwrap();
1094        agent_version.to_string()
1095    }
1096    
1097    /// Set the config flag to true or false
1098    #[tracing::instrument(skip_all)]
1099    pub fn set_config(&self, config: Option<Config>) {
1100        let mut current = self.config.lock().unwrap();
1101        *current = config;
1102    }
1103    /// Return value of the config 
1104    #[tracing::instrument(skip_all)]
1105    pub fn get_config(&self) -> Option<Config> {
1106        let config = self.config.lock().unwrap();
1107        config.clone()
1108    }
1109    /// Set the client flag to true or false
1110    #[tracing::instrument(skip_all)]
1111    pub fn set_client(&self, client: ClientEnum) {
1112        let mut current = self.client.lock().unwrap();
1113        *current = client;
1114    }
1115    /// Return value of the client
1116    #[tracing::instrument(skip_all)]
1117    fn get_client(&self) -> ClientEnum {
1118        let client = self.client.lock().unwrap();
1119        client.clone()
1120    }
1121    /// Set the user flag to true or false
1122    #[tracing::instrument(skip_all)]
1123    pub fn set_user(&self, user: Option<User>) {
1124        let mut current = self.user.lock().unwrap();
1125        *current = user;
1126    }
1127    /// Return value of the user
1128    #[tracing::instrument(skip_all)]
1129    pub fn get_user(&self) -> Option<User> {
1130        let user = self.user.lock().unwrap();
1131        user.clone()
1132    }
1133    // /// Return the signed in user, if we are signed in.
1134    // #[tracing::instrument(skip_all)]
1135    // pub fn get_user(&self) -> Option<User> {
1136    //     // let inner = self.inner.lock().await;
1137    //     // inner.user.clone()
1138    //     self.user.clone()
1139    // }
1140    
1141    /// Set the runtime flag to true or false
1142    #[tracing::instrument(skip_all)]
1143    pub fn set_runtime(&self, runtime: Option<tokio::runtime::Runtime>) {
1144        let mut current = self.runtime.lock().unwrap();
1145        *current = runtime;
1146    }
1147    /// Return value of the runtime
1148    #[tracing::instrument(skip_all)]
1149    // pub fn get_runtime(&self) -> Option<Arc<tokio::runtime::Runtime>> {
1150    pub fn get_runtime(&self) -> &std::sync::Mutex<std::option::Option<tokio::runtime::Runtime>> {
1151        self.runtime.as_ref()
1152    }
1153    /// Return value of the runtime handle
1154    #[tracing::instrument(skip_all)]
1155    pub fn get_runtime_handle(&self) -> tokio::runtime::Handle {
1156        let mut rt = self.runtime.lock().unwrap();
1157        if rt.is_none() {
1158            // println!("Rust: Initializing new Tokio runtime");
1159            let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
1160            *rt = Some(runtime);
1161        } else {
1162            // println!("Rust: Runtime already initialized");
1163        }
1164        rt.as_ref().unwrap().handle().clone()
1165    }
1166    /// Method to allow the user to subscribe with a callback function
1167    #[tracing::instrument(skip_all)]
1168    pub async fn on_event(&self, callback: Box<dyn Fn(ClientEvent) + Send + Sync>)
1169    {
1170        // call the callback function every time there is an event in the client.event_receiver
1171        let event_receiver = self.event_receiver.clone();
1172        let callback = callback;
1173        let _handle =  tokio::task::spawn(async move {
1174            while let Ok(event) = event_receiver.recv().await {
1175                callback(event);
1176            }
1177        }); // .unwrap();
1178    }
1179    /// Internal function, used to generate a unique id for each message sent to the server.
1180    #[tracing::instrument(skip_all)]
1181    pub fn get_uniqueid() -> String {
1182        static COUNTER: AtomicUsize = AtomicUsize::new(1);
1183        let num1 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1184        let num2 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1185        let num3 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1186        let sqids = Sqids::default();
1187        sqids.encode(&[num1, num2, num3 ]).unwrap().to_string()
1188    }
1189    /// Internal function, Send a message to the OpenIAP server, and wait for a response.
1190    #[tracing::instrument(skip_all)]
1191    async fn send(&self, msg: Envelope) -> Result<Envelope, OpenIAPError> {
1192        let response = self.send_noawait(msg).await;
1193        match response {
1194            Ok((response_rx, id)) => {
1195                // Await the response
1196                let response = response_rx.await;
1197    
1198                // Remove the entry from `inner.queries` after awaiting
1199                let inner = self.inner.lock().await;
1200                inner.queries.lock().await.remove(&id);
1201    
1202                // Handle the result of the await
1203                match response {
1204                    Ok(response) => Ok(response),
1205                    Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1206                }
1207            }
1208            Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1209        }
1210    }
1211    /// Internal function, Send a message to the OpenIAP server, and do not wait for a response.
1212    /// used when sending a stream of data, or when we do not need a response.
1213    #[tracing::instrument(skip_all)]
1214    async fn send_noawait(
1215        &self,
1216        mut msg: Envelope,
1217    ) -> Result<(oneshot::Receiver<Envelope>, String), OpenIAPError> {
1218        let (response_tx, response_rx) = oneshot::channel();
1219        let id = Client::get_uniqueid();
1220        msg.id = id.clone();
1221    
1222        // Lock and insert the sender into `inner.queries`
1223        {
1224            let inner = self.inner.lock().await;
1225            inner.queries.lock().await.insert(id.clone(), response_tx);
1226        }
1227    
1228        // Send the message and check for errors
1229        let res = self.send_envelope(msg).await;
1230        if let Err(e) = res {
1231            // Remove the entry from `inner.queries` if the send fails
1232            let inner = self.inner.lock().await;
1233            inner.queries.lock().await.remove(&id);
1234            return Err(OpenIAPError::ClientError(e.to_string()));
1235        }
1236    
1237        Ok((response_rx, id))
1238    }
1239    /// Internal function, Setup a new stream, send a message to the OpenIAP server, and return a stream to send and receive data.
1240    #[tracing::instrument(skip_all)]
1241    async fn sendwithstream(
1242        &self,
1243        mut msg: Envelope,
1244    ) -> Result<(oneshot::Receiver<Envelope>, mpsc::Receiver<Vec<u8>>), OpenIAPError> {
1245        let (response_tx, response_rx) = oneshot::channel();
1246        let (stream_tx, stream_rx) = mpsc::channel(1024 * 1024);
1247        let id = Client::get_uniqueid();
1248        msg.id = id.clone();
1249        {
1250            let inner = self.inner.lock().await;
1251            inner.queries.lock().await.insert(id.clone(), response_tx);
1252            inner.streams.lock().await.insert(id.clone(), stream_tx);
1253            let res = self.send_envelope(msg).await;
1254            match res {
1255                Ok(_) => (),
1256                Err(e) => return Err(OpenIAPError::ClientError(e.to_string())),
1257            }
1258        }
1259        Ok((response_rx, stream_rx))
1260    }
1261    #[tracing::instrument(skip_all, target = "openiap::client")]
1262    async fn send_envelope(&self, mut envelope: Envelope) -> Result<(), OpenIAPError> {
1263        if (self.get_state() != ClientState::Connected && self.get_state() != ClientState::Signedin ) 
1264            && envelope.command != "signin" && envelope.command != "getelement" && envelope.command != "pong" {
1265            return Err(OpenIAPError::ClientError(format!("Not connected ( {:?} )", self.get_state())));
1266        }
1267        let env = envelope.clone();
1268        let command = envelope.command.clone();
1269        self.stats.lock().unwrap().package_tx += 1;
1270        match command.as_str() {
1271            "signin" => { self.stats.lock().unwrap().signin += 1;},
1272            "upload" => { self.stats.lock().unwrap().upload += 1;},
1273            "download" => { self.stats.lock().unwrap().download += 1;},
1274            "getdocumentversion" => { self.stats.lock().unwrap().getdocumentversion += 1;},
1275            "customcommand" => { self.stats.lock().unwrap().customcommand += 1;},
1276            "listcollections" => { self.stats.lock().unwrap().listcollections += 1;},
1277            "createcollection" => { self.stats.lock().unwrap().createcollection += 1;},
1278            "dropcollection" => { self.stats.lock().unwrap().dropcollection += 1;},
1279            "ensurecustomer" => { self.stats.lock().unwrap().ensurecustomer += 1;},
1280            "invokeopenrpa" => { self.stats.lock().unwrap().invokeopenrpa += 1;},
1281
1282            "registerqueue" => { self.stats.lock().unwrap().registerqueue += 1;},
1283            "registerexchange" => { self.stats.lock().unwrap().registerexchange += 1;},
1284            "unregisterqueue" => { self.stats.lock().unwrap().unregisterqueue += 1;},
1285            "watch" => { self.stats.lock().unwrap().watch += 1;},
1286            "unwatch" => { self.stats.lock().unwrap().unwatch += 1;},
1287            "queuemessage" => { self.stats.lock().unwrap().queuemessage += 1;},
1288
1289            "pushworkitem" => { self.stats.lock().unwrap().pushworkitem += 1;},
1290            "pushworkitems" => { self.stats.lock().unwrap().pushworkitems += 1;},
1291            "popworkitem" => { self.stats.lock().unwrap().popworkitem += 1;},
1292            "updateworkitem" => { self.stats.lock().unwrap().updateworkitem += 1;},
1293            "deleteworkitem" => { self.stats.lock().unwrap().deleteworkitem += 1;},
1294            "addworkitemqueue" => { self.stats.lock().unwrap().addworkitemqueue += 1;},
1295            "updateworkitemqueue" => { self.stats.lock().unwrap().updateworkitemqueue += 1;},
1296            "deleteworkitemqueue" => { self.stats.lock().unwrap().deleteworkitemqueue += 1;},
1297
1298            "getindexes" => { self.stats.lock().unwrap().getindexes += 1;},
1299            "createindex" => { self.stats.lock().unwrap().createindex += 1;},
1300            "dropindex" => { self.stats.lock().unwrap().dropindex += 1;},
1301            "query" => { self.stats.lock().unwrap().query += 1;},
1302            "count" => { self.stats.lock().unwrap().count += 1;},
1303            "distinct" => { self.stats.lock().unwrap().distinct += 1;},
1304            "aggregate" => { self.stats.lock().unwrap().aggregate += 1;},
1305            "insertone" => { self.stats.lock().unwrap().insertone += 1;},
1306            "insertmany" => { self.stats.lock().unwrap().insertmany += 1;},
1307            "updateone" => { self.stats.lock().unwrap().updateone += 1;},
1308            "insertorupdateone" => { self.stats.lock().unwrap().insertorupdateone += 1;},
1309            "insertorupdatemany" => { self.stats.lock().unwrap().insertorupdatemany += 1;},
1310            "updatedocument" => { self.stats.lock().unwrap().updatedocument += 1;},
1311            "deleteone" => { self.stats.lock().unwrap().deleteone += 1;},
1312            "deletemany" => { self.stats.lock().unwrap().deletemany += 1;},
1313            _ => {}
1314        };
1315        if envelope.id.is_empty() {
1316            let id = Client::get_uniqueid();
1317            envelope.id = id.clone();
1318        }
1319        trace!("Sending {} message, in the thread", command);
1320        let res = self.out_envelope_sender.send(env).await;
1321        if res.is_err() {
1322            error!("{:?}", res);
1323            let errmsg = res.unwrap_err().to_string();
1324            self.set_connected(ClientState::Disconnected, Some(&errmsg));
1325            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", errmsg)))
1326        } else {
1327            return Ok(())
1328        }
1329    }
1330    #[tracing::instrument(skip_all, target = "openiap::client")]
1331    async fn parse_incomming_envelope(&self, received: Envelope) {
1332        self.stats.lock().unwrap().package_rx += 1;
1333        let command = received.command.clone();
1334        trace!("parse_incomming_envelope, command: {}", command);
1335        let inner = self.inner.lock().await;
1336        let rid = received.rid.clone();
1337        let mut queries = inner.queries.lock().await;
1338        let mut streams = inner.streams.lock().await;
1339        let watches = inner.watches.lock().await;
1340        let queues = inner.queues.lock().await;
1341    
1342        if command != "ping" && command != "pong" && command != "refreshtoken" {
1343            if rid.is_empty() {
1344                debug!("Received #{} #{} {} message", received.seq, received.id, command);
1345            } else {
1346                debug!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1347            }
1348        } else if rid.is_empty() {
1349            trace!("Received #{} #{} {} message", received.seq, received.id, command);
1350        } else {
1351            trace!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1352        }
1353        
1354        if command == "ping" {
1355            self.pong(&received.id).await;
1356            // self.event_sender.send(crate::ClientEvent::Ping).await.unwrap();
1357        } else if command == "refreshtoken" {
1358            // TODO: Do we store the new jwt at some point in the future
1359        } else if command == "beginstream"
1360            || command == "stream"
1361            || command == "endstream"
1362        {
1363            let streamresponse: Stream =
1364                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1365            let streamdata = streamresponse.data;
1366            if !streamdata.is_empty() {
1367                let stream = streams.get(rid.as_str()).unwrap();
1368    
1369                match stream.send(streamdata).await {
1370                    Ok(_) => _ = (),
1371                    Err(e) => error!("Failed to send data: {}", e),
1372                }
1373            }
1374            if command == "endstream" {
1375                let _ = streams.remove(rid.as_str());
1376            }
1377        } else if command == "watchevent" {
1378            let watchevent: WatchEvent =
1379                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1380            if let Some(callback) = watches.get(watchevent.id.as_str()) {
1381                callback(watchevent);
1382            }
1383        } else if command == "queueevent" {
1384            let queueevent: QueueEvent =
1385                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1386            if let Some(callback) = queues.get(queueevent.queuename.as_str()) {
1387                callback(queueevent);
1388            }
1389        } else if let Some(response_tx) = queries.remove(&rid) {
1390            let stream = streams.get(rid.as_str());
1391            if let Some(stream) = stream {
1392                let streamdata = vec![];
1393                match stream.send(streamdata).await {
1394                    Ok(_) => _ = (),
1395                    Err(e) => error!("Failed to send data: {}", e),
1396                }
1397            }
1398            let _ = response_tx.send(received);
1399        } else {
1400            error!("Received unhandled {} message: {:?}", command, received);
1401        }    
1402    }    
1403    /// Internal function, used to send a fake getelement to the OpenIAP server.
1404    #[tracing::instrument(skip_all)]
1405    async fn get_element(&self) -> Result<(), OpenIAPError> {
1406        let id = Client::get_uniqueid();
1407        let envelope = Envelope {
1408            id: id.clone(),
1409            command: "getelement".into(),
1410            ..Default::default()
1411        };
1412        let result = match self.send(envelope).await {
1413            Ok(res) => res,
1414            Err(e) => {
1415                return Err(e);
1416            },
1417        };
1418        if result.command == "pong" || result.command == "getelement" {
1419            Ok(())
1420        } else if result.command == "error" {
1421            let e: ErrorResponse = prost::Message::decode(result.data.unwrap().value.as_ref()).unwrap();
1422            Err(OpenIAPError::ServerError(e.message))
1423        } else {
1424            Err(OpenIAPError::ClientError("Failed to receive getelement".to_string()))
1425        }
1426    }
1427    /// Internal function, used to send a ping to the OpenIAP server.
1428    #[tracing::instrument(skip_all)]
1429    async fn ping(&self) -> Result<(), OpenIAPError> {
1430        let id = Client::get_uniqueid();
1431        let envelope = Envelope {
1432            id: id.clone(),
1433            command: "getelement".into(),
1434            ..Default::default()
1435        };
1436        match self.send_envelope(envelope).await {
1437            Ok(_res) => Ok(()),
1438            Err(e) => {
1439                return Err(e);
1440            },
1441        }
1442    }
1443    /// Internal function, used to send a pong response to the OpenIAP server.
1444    #[tracing::instrument(skip_all)]
1445    async fn pong(&self, rid: &str) {
1446        let id = Client::get_uniqueid();
1447        let envelope = Envelope {
1448            id: id.clone(),
1449            command: "pong".into(),
1450            rid: rid.to_string(),
1451            ..Default::default()
1452        };
1453        match self.send_envelope(envelope).await {
1454            Ok(_) => (),
1455            Err(e) => error!("Failed to send pong: {}", e),
1456        }
1457    }
1458    /// Sign in to the OpenIAP service. \
1459    /// If no username and password is provided, it will attempt to use environment variables.\
1460    /// if config is set to validateonly, it will only validate the credentials, but not sign in.\
1461    /// If no jwt, username and password is provided, it will attempt to use environment variables.\
1462    /// will prefere OPENIAP_JWT (or jwt) over OPENIAP_USERNAME and OPENIAP_PASSWORD.
1463    #[tracing::instrument(skip_all)]
1464    pub async fn signin(&self, mut config: SigninRequest) -> Result<SigninResponse, OpenIAPError> {
1465        // autodetect how to signin using environment variables
1466        if config.username.is_empty() && config.password.is_empty() && config.jwt.is_empty() {
1467            if config.jwt.is_empty() {
1468                config.jwt = std::env::var("OPENIAP_JWT").unwrap_or_default();
1469            }
1470            if config.jwt.is_empty() {
1471                config.jwt = std::env::var("jwt").unwrap_or_default();
1472            }
1473            // if no jwt was found, test for username and password
1474            if config.jwt.is_empty() {
1475                if config.username.is_empty() {
1476                    config.username = std::env::var("OPENIAP_USERNAME").unwrap_or_default();
1477                }
1478                if config.password.is_empty() {
1479                    config.password = std::env::var("OPENIAP_PASSWORD").unwrap_or_default();
1480                }
1481            }
1482        }
1483        let version = env!("CARGO_PKG_VERSION");
1484        if !version.is_empty() && config.version.is_empty() {
1485            config.version = version.to_string();
1486        }
1487        if config.agent.is_empty() {
1488            config.agent = self.get_agent_name();
1489        }
1490
1491        debug!("Attempting sign-in using {:?}", config);
1492        let envelope = config.to_envelope();
1493        let result = self.send(envelope).await;
1494
1495        match &result {
1496            Ok(m) => {
1497                debug!("Sign-in reply received");
1498                if m.command == "error" {
1499                    let e: ErrorResponse =
1500                        prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1501                            .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1502                    return Err(OpenIAPError::ServerError(e.message));
1503                }
1504                debug!("Sign-in successful");
1505                let response: SigninResponse =
1506                    prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1507                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1508                if !config.validateonly {
1509                    self.set_connected(ClientState::Signedin, None);
1510                    self.set_user(Some(response.user.as_ref().unwrap().clone()));
1511                }
1512                Ok(response)
1513            }
1514            Err(e) => {
1515                debug!("Sending Sign-in request failed {:?}", result);
1516                debug!("Sign-in failed: {}", e.to_string());
1517                if !config.validateonly {
1518                    self.set_user(None);
1519                }
1520                Err(OpenIAPError::ClientError(e.to_string()))
1521            }
1522        }
1523    }
1524    /// Return a list of collections in the database
1525    /// - includehist: include historical collections, default is false.
1526    /// please see create_collection for examples on how to create collections.
1527    #[tracing::instrument(skip_all)]
1528    pub async fn list_collections(&self, includehist: bool) -> Result<String, OpenIAPError> {
1529        let config = ListCollectionsRequest::new(includehist);
1530        let envelope = config.to_envelope();
1531        let result = self.send(envelope).await;
1532        match result {
1533            Ok(m) => {
1534                let data = match m.data {
1535                    Some(data) => data,
1536                    None => {
1537                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1538                    }
1539                };
1540                if m.command == "error" {
1541                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1542                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1543                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1544                }
1545                let response: ListCollectionsResponse = prost::Message::decode(data.value.as_ref())
1546                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1547                Ok(response.results)
1548            }
1549            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1550        }
1551    }
1552    /// Create a new collection in the database.
1553    /// You can create a collection by simply adding a new document to it using [Client::insert_one].
1554    /// Or you can create a collecting using the following example:
1555    /// ```
1556    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1557    /// #[tokio::main]
1558    /// async fn main() -> Result<(), OpenIAPError> {
1559    ///     let client = Client::new_connect("").await?;
1560    ///     //let collections = client.list_collections(false).await?;
1561    ///     //println!("Collections: {}", collections);
1562    ///     let config = CreateCollectionRequest::byname("rusttestcollection");
1563    ///     client.create_collection(config).await?;
1564    ///     let config = DropCollectionRequest::byname("rusttestcollection");
1565    ///     client.drop_collection(config).await?;
1566    ///     Ok(())
1567    /// }
1568    /// ```
1569    /// You can create a normal collection with a TTL index on the _created field, using the following example:
1570    /// ```
1571    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1572    /// #[tokio::main]
1573    /// async fn main() -> Result<(), OpenIAPError> {
1574    ///     let client = Client::new_connect("").await?;
1575    ///     let config = CreateCollectionRequest::with_ttl(
1576    ///         "rusttestttlcollection",
1577    ///         60
1578    ///     );
1579    ///     client.create_collection(config).await?;
1580    ///     let config = DropCollectionRequest::byname("rusttestttlcollection");
1581    ///     client.drop_collection(config).await?;
1582    ///     Ok(())
1583    /// }
1584    /// ```
1585    /// You can create a time series collection using the following example:
1586    /// granularity can be one of: seconds, minutes, hours
1587    /// ```
1588    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1589    /// #[tokio::main]
1590    /// async fn main() -> Result<(), OpenIAPError> {
1591    ///     let client = Client::new_connect("").await?;
1592    ///     let config = CreateCollectionRequest::timeseries(
1593    ///         "rusttesttscollection2",
1594    ///         "_created",
1595    ///         "minutes"
1596    ///     );
1597    ///     client.create_collection(config).await?;
1598    ///     let config = DropCollectionRequest::byname("rusttesttscollection2");
1599    ///     client.drop_collection(config).await?;
1600    ///     Ok(())
1601    /// }
1602    /// ```
1603    #[tracing::instrument(skip_all)]
1604    pub async fn create_collection(
1605        &self,
1606        config: CreateCollectionRequest,
1607    ) -> Result<(), OpenIAPError> {
1608        if config.collectionname.is_empty() {
1609            return Err(OpenIAPError::ClientError(
1610                "No collection name provided".to_string(),
1611            ));
1612        }
1613        let envelope = config.to_envelope();
1614        let result = self.send(envelope).await;
1615        match result {
1616            Ok(m) => {
1617                let data = match m.data {
1618                    Some(data) => data,
1619                    None => {
1620                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1621                    }
1622                };
1623                if m.command == "error" {
1624                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1625                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1626                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1627                }
1628                Ok(())
1629            }
1630            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1631        }
1632    }
1633    /// Drop a collection from the database, this will delete all data and indexes for the collection.
1634    /// See [Client::create_collection] for examples on how to create a collection.
1635    #[tracing::instrument(skip_all)]
1636    pub async fn drop_collection(&self, config: DropCollectionRequest) -> Result<(), OpenIAPError> {
1637        if config.collectionname.is_empty() {
1638            return Err(OpenIAPError::ClientError(
1639                "No collection name provided".to_string(),
1640            ));
1641        }
1642        let envelope = config.to_envelope();
1643        let result = self.send(envelope).await;
1644        match result {
1645            Ok(m) => {
1646                let data = match m.data {
1647                    Some(data) => data,
1648                    None => {
1649                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1650                    }
1651                };
1652                if m.command == "error" {
1653                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1654                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1655                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1656                }
1657                Ok(())
1658            }
1659            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1660        }
1661    }
1662    /// Return all indexes for a collection in the database
1663    /// ```
1664    /// use openiap_client::{Client, GetIndexesRequest, OpenIAPError};
1665    /// #[tokio::main]
1666    /// async fn main() -> Result<(), OpenIAPError> {
1667    ///     let client = Client::new_connect("").await?;
1668    ///     let config = GetIndexesRequest::bycollectionname("rustindextestcollection");
1669    ///     let indexes = client.get_indexes(config).await?;
1670    ///     println!("Indexes: {}", indexes);
1671    ///     Ok(())
1672    /// }
1673    /// ```
1674    /// 
1675    pub async fn get_indexes(&self, config: GetIndexesRequest) -> Result<String, OpenIAPError> {
1676        if config.collectionname.is_empty() {
1677            return Err(OpenIAPError::ClientError(
1678                "No collection name provided".to_string(),
1679            ));
1680        }
1681        let envelope = config.to_envelope();
1682        let result = self.send(envelope).await;
1683        match result {
1684            Ok(m) => {
1685                let data = match m.data {
1686                    Some(data) => data,
1687                    None => {
1688                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1689                    }
1690                };
1691                if m.command == "error" {
1692                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1693                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1694                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1695                }
1696                let response: GetIndexesResponse = prost::Message::decode(data.value.as_ref())
1697                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1698                Ok(response.results)
1699            }
1700            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1701        }
1702    }
1703    /// Create an index in the database.
1704    /// Example of creating an index on the name field in the rustindextestcollection collection, and then dropping it again:
1705    /// ```
1706    /// use openiap_client::{Client, DropIndexRequest, CreateIndexRequest, OpenIAPError};
1707    /// #[tokio::main]
1708    /// async fn main() -> Result<(), OpenIAPError> {
1709    ///     let client = Client::new_connect("").await?;
1710    ///     let config = CreateIndexRequest::bycollectionname(
1711    ///         "rustindextestcollection",
1712    ///         "{\"name\": 1}"
1713    ///     );
1714    ///     client.create_index(config).await?;
1715    ///     let config = DropIndexRequest::bycollectionname(
1716    ///         "rustindextestcollection",
1717    ///         "name_1"
1718    ///     );
1719    ///     client.drop_index(config).await?;
1720    ///     Ok(())
1721    /// }
1722    /// ```
1723    /// Example of creating an unique index on the address field in the rustindextestcollection collection, and then dropping it again:
1724    /// ```
1725    /// use openiap_client::{Client, DropIndexRequest, CreateIndexRequest, OpenIAPError};
1726    /// #[tokio::main]
1727    /// async fn main() -> Result<(), OpenIAPError> {
1728    ///     let client = Client::new_connect("").await?;
1729    ///     let mut config = CreateIndexRequest::bycollectionname(
1730    ///         "rustindextestcollection",
1731    ///         "{\"address\": 1}"
1732    ///     );
1733    ///     config.options = "{\"unique\": true}".to_string();
1734    ///     client.create_index(config).await?;
1735    ///     let config = DropIndexRequest::bycollectionname(
1736    ///         "rustindextestcollection",
1737    ///         "address_1"
1738    ///     );
1739    ///     client.drop_index(config).await?;
1740    ///     Ok(())
1741    /// }
1742    /// ```
1743    pub async fn create_index(&self, config: CreateIndexRequest) -> Result<(), OpenIAPError> {
1744        if config.collectionname.is_empty() {
1745            return Err(OpenIAPError::ClientError(
1746                "No collection name provided".to_string(),
1747            ));
1748        }
1749        if config.index.is_empty() {
1750            return Err(OpenIAPError::ClientError(
1751                "No index was provided".to_string(),
1752            ));
1753        }
1754        let envelope = config.to_envelope();
1755        let result = self.send(envelope).await;
1756        match result {
1757            Ok(m) => {
1758                let data = match m.data {
1759                    Some(data) => data,
1760                    None => {
1761                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1762                    }
1763                };
1764                if m.command == "error" {
1765                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1766                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1767                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1768                }
1769                Ok(())
1770            }
1771            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1772        }
1773    }
1774    /// Drop an index from the database
1775    /// See [Client::create_index] for an example on how to create and drop an index.
1776    pub async fn drop_index(&self, config: DropIndexRequest) -> Result<(), OpenIAPError> {
1777        if config.collectionname.is_empty() {
1778            return Err(OpenIAPError::ClientError(
1779                "No collection name provided".to_string(),
1780            ));
1781        }
1782        if config.name.is_empty() {
1783            return Err(OpenIAPError::ClientError(
1784                "No index name provided".to_string(),
1785            ));
1786        }
1787        let envelope = config.to_envelope();
1788        let result = self.send(envelope).await;
1789        match result {
1790            Ok(m) => {
1791                let data = match m.data {
1792                    Some(data) => data,
1793                    None => {
1794                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1795                    }
1796                };
1797                if m.command == "error" {
1798                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1799                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1800                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1801                }
1802                Ok(())
1803            }
1804            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1805        }
1806    }
1807    /// To query all documents in the entities collection where _type is test, you can use the following example:
1808    /// ```
1809    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1810    /// #[tokio::main]
1811    /// async fn main() -> Result<(), OpenIAPError> {
1812    ///     let client = Client::new_connect("").await?;
1813    ///     let q = client.query( QueryRequest::with_query(
1814    ///         "entities",
1815    ///         "{\"_type\":\"test\"}"
1816    ///     )).await?;
1817    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
1818    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1819    ///     for item in items {
1820    ///         println!("Item: {:?}", item);
1821    ///     }
1822    ///     Ok(())
1823    /// }
1824    /// ```
1825    /// To query all documents in the entities collection, and only return the name and _id field for all documents, you can use the following example:
1826    /// ```
1827    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1828    /// #[tokio::main]
1829    /// async fn main() -> Result<(), OpenIAPError> {
1830    ///     let client = Client::new_connect("").await?;
1831    ///     let q = client.query( QueryRequest::with_projection(
1832    ///         "entities",
1833    ///         "{}",
1834    ///         "{\"name\":1}"
1835    ///     )).await?;
1836    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
1837    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1838    ///     for item in items {
1839    ///         println!("Item: {:?}", item);
1840    ///     }
1841    ///     Ok(())
1842    /// }
1843    /// ```
1844    #[tracing::instrument(skip_all)]
1845    pub async fn query(&self, mut config: QueryRequest) -> Result<QueryResponse, OpenIAPError> {
1846        if config.collectionname.is_empty() {
1847            config.collectionname = "entities".to_string();
1848        }
1849
1850        let envelope = config.to_envelope();
1851        let result = self.send(envelope).await;
1852        match result {
1853            Ok(m) => {
1854                let data = match m.data {
1855                    Some(data) => data,
1856                    None => {
1857                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1858                    }
1859                };
1860                if m.command == "error" {
1861                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1862                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1863                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1864                }
1865                let response: QueryResponse = prost::Message::decode(data.value.as_ref())
1866                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1867                debug!("Return Ok(response)");
1868                Ok(response)
1869            }
1870            Err(e) => {
1871                debug!("Error !!");
1872                Err(OpenIAPError::ClientError(e.to_string()))
1873            }
1874        }
1875    }
1876    /// Try and get a single document from the database.\
1877    /// If no document is found, it will return None.
1878    /// ```
1879    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1880    /// #[tokio::main]
1881    /// async fn main() -> Result<(), OpenIAPError> {
1882    ///     let client = Client::new_connect("").await?;
1883    ///     let config = QueryRequest::with_query(
1884    ///         "users",
1885    ///         "{\"username\":\"guest\"}"
1886    ///     );
1887    ///     let item = client.get_one(config).await;
1888    ///     match item {
1889    ///         Some(item) => {
1890    ///             assert_eq!(item["username"], "guest");
1891    ///             println!("Item: {:?}", item);
1892    ///         }
1893    ///         None => {
1894    ///             println!("No item found");
1895    ///             assert!(false, "No item found");
1896    ///         }
1897    ///     }
1898    ///     Ok(())
1899    /// }
1900    /// ```
1901    #[tracing::instrument(skip_all)]
1902    pub async fn get_one(&self, mut config: QueryRequest) -> Option<serde_json::Value> {
1903        if config.collectionname.is_empty() {
1904            config.collectionname = "entities".to_string();
1905        }
1906        config.top = 1;
1907        let envelope = config.to_envelope();
1908        let result = self.send(envelope).await;
1909        match result {
1910            Ok(m) => {
1911                let data = match m.data {
1912                    Some(data) => data,
1913                    None => return None,
1914                };
1915                if m.command == "error" {
1916                    return None;
1917                }
1918                let response: QueryResponse = prost::Message::decode(data.value.as_ref()).ok()?;
1919
1920                let items: serde_json::Value = serde_json::from_str(&response.results).unwrap();
1921                let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1922                if items.is_empty() {
1923                    return None;
1924                }
1925                let item = items[0].clone();
1926                Some(item)
1927            }
1928            Err(_) => None,
1929        }
1930    }
1931
1932    /// Try and get a specefic version of a document from the database, reconstructing it from the history collection
1933    /// ```
1934    /// use openiap_client::{OpenIAPError, Client, GetDocumentVersionRequest, InsertOneRequest, UpdateOneRequest};
1935    /// #[tokio::main]
1936    /// async fn main() -> Result<(), OpenIAPError> {
1937    ///     let client = Client::new_connect("").await?;
1938    ///     let item = "{\"name\": \"test from rust\", \"_type\": \"test\"}";
1939    ///     let query = InsertOneRequest {
1940    ///         collectionname: "entities".to_string(),
1941    ///         item: item.to_string(),
1942    ///         j: true,
1943    ///         w: 2,
1944    ///         ..Default::default()
1945    ///     };
1946    ///     let response = client.insert_one(query).await;
1947    ///     let response = match response {
1948    ///         Ok(r) => r,
1949    ///         Err(e) => {
1950    ///             println!("Error: {:?}", e);
1951    ///             assert!(false, "insert_one failed with {:?}", e);
1952    ///             return Ok(());
1953    ///         }
1954    ///     };
1955    ///     let _obj: serde_json::Value = serde_json::from_str(&response.result).unwrap();
1956    ///     let _id = _obj["_id"].as_str().unwrap();
1957    ///     let item = format!("{{\"name\":\"updated from rust\", \"_id\": \"{}\"}}", _id);
1958    ///     let query = UpdateOneRequest {
1959    ///         collectionname: "entities".to_string(),
1960    ///         item: item.to_string(),
1961    ///         ..Default::default()
1962    ///     };
1963    ///     let response = client.update_one(query).await;
1964    ///     _ = match response {
1965    ///         Ok(r) => r,
1966    ///         Err(e) => {
1967    ///             println!("Error: {:?}", e);
1968    ///             assert!(false, "update_one failed with {:?}", e);
1969    ///             return Ok(());
1970    ///         }
1971    ///     };
1972    ///     let query = GetDocumentVersionRequest {
1973    ///         collectionname: "entities".to_string(),
1974    ///         id: _id.to_string(),
1975    ///         version: 0,
1976    ///         ..Default::default()
1977    ///     };
1978    ///     let response = client.get_document_version(query).await;
1979    ///     let response = match response {
1980    ///         Ok(r) => r,
1981    ///         Err(e) => {
1982    ///             println!("Error: {:?}", e);
1983    ///             assert!(false, "get_document_version failed with {:?}", e);
1984    ///             return Ok(());
1985    ///         }
1986    ///     };
1987    ///     let _obj = serde_json::from_str(&response);
1988    ///     let _obj: serde_json::Value = match _obj {
1989    ///         Ok(r) => r,
1990    ///         Err(e) => {
1991    ///             println!("Error: {:?}", e);
1992    ///             assert!(
1993    ///                 false,
1994    ///                 "parse get_document_version result failed with {:?}",
1995    ///                 e
1996    ///             );
1997    ///             return Ok(());
1998    ///         }
1999    ///     };
2000    ///     let name = _obj["name"].as_str().unwrap();
2001    ///     let version = _obj["_version"].as_i64().unwrap();
2002    ///     println!("version 0 Name: {}, Version: {}", name, version);
2003    ///     assert_eq!(name, "test from rust");
2004    ///     let query = GetDocumentVersionRequest {
2005    ///         collectionname: "entities".to_string(),
2006    ///         id: _id.to_string(),
2007    ///         version: 1,
2008    ///         ..Default::default()
2009    ///     };
2010    ///     let response = client.get_document_version(query).await;
2011    ///     assert!(
2012    ///         response.is_ok(),
2013    ///         "test_get_document_version failed with {:?}",
2014    ///         response.err().unwrap()
2015    ///     );
2016    ///     let _obj: serde_json::Value = serde_json::from_str(&response.unwrap()).unwrap();
2017    ///     let name = _obj["name"].as_str().unwrap();
2018    ///     let version = _obj["_version"].as_i64().unwrap();
2019    ///     println!("version 1 Name: {}, Version: {}", name, version);
2020    ///     assert_eq!(name, "updated from rust");
2021    ///     let query = GetDocumentVersionRequest {
2022    ///         collectionname: "entities".to_string(),
2023    ///         id: _id.to_string(),
2024    ///         version: -1,
2025    ///         ..Default::default()
2026    ///     };
2027    ///     let response = client.get_document_version(query).await;
2028    ///     assert!(
2029    ///         response.is_ok(),
2030    ///         "test_get_document_version failed with {:?}",
2031    ///         response.err().unwrap()
2032    ///     );
2033    ///     let _obj: serde_json::Value = serde_json::from_str(&response.unwrap()).unwrap();
2034    ///     let name = _obj["name"].as_str().unwrap();
2035    ///     let version = _obj["_version"].as_i64().unwrap();
2036    ///     println!("version -1 Name: {}, Version: {}", name, version);
2037    ///     assert_eq!(name, "updated from rust");
2038    ///     Ok(())
2039    /// }
2040    /// ```
2041    #[tracing::instrument(skip_all)]
2042    pub async fn get_document_version(
2043        &self,
2044        mut config: GetDocumentVersionRequest,
2045    ) -> Result<String, OpenIAPError> {
2046        if config.collectionname.is_empty() {
2047            config.collectionname = "entities".to_string();
2048        }
2049        if config.id.is_empty() {
2050            return Err(OpenIAPError::ClientError("No id provided".to_string()));
2051        }
2052        let envelope = config.to_envelope();
2053        let result = self.send(envelope).await;
2054        match result {
2055            Ok(m) => {
2056                let data = match m.data {
2057                    Some(data) => data,
2058                    None => {
2059                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2060                    }
2061                };
2062                if m.command == "error" {
2063                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2064                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2065                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2066                }
2067                let response: GetDocumentVersionResponse =
2068                    prost::Message::decode(data.value.as_ref())
2069                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2070                Ok(response.result)
2071            }
2072            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2073        }
2074    }
2075    /// Run an aggregate pipeline towards the database
2076    /// Example of running an aggregate pipeline on the entities collection, counting the number of documents with _type=test, and grouping them by name:
2077    /// ```
2078    /// use openiap_client::{OpenIAPError, Client, AggregateRequest};
2079    /// #[tokio::main]
2080    /// async fn main() -> Result<(), OpenIAPError> {
2081    ///    let client = Client::new_connect("").await?;
2082    ///     let config = AggregateRequest {
2083    ///         collectionname: "entities".to_string(),
2084    ///         aggregates: "[{\"$match\": {\"_type\": \"test\"}}, {\"$group\": {\"_id\": \"$name\", \"count\": {\"$sum\": 1}}}]".to_string(),
2085    ///         ..Default::default()
2086    ///     };
2087    ///     let response = client.aggregate(config).await?;
2088    ///     println!("Response: {:?}", response);
2089    ///     Ok(())
2090    /// }
2091    /// ```
2092    /// 
2093    #[tracing::instrument(skip_all)]
2094    pub async fn aggregate(
2095        &self,
2096        mut config: AggregateRequest,
2097    ) -> Result<AggregateResponse, OpenIAPError> {
2098        if config.collectionname.is_empty() {
2099            config.collectionname = "entities".to_string();
2100        }
2101        if config.hint.is_empty() {
2102            config.hint = "".to_string();
2103        }
2104        if config.queryas.is_empty() {
2105            config.queryas = "".to_string();
2106        }
2107        if config.aggregates.is_empty() {
2108            return Err(OpenIAPError::ClientError(
2109                "No aggregates provided".to_string(),
2110            ));
2111        }
2112        let envelope = config.to_envelope();
2113        let result = self.send(envelope).await;
2114        match result {
2115            Ok(m) => {
2116                let data = match m.data {
2117                    Some(data) => data,
2118                    None => {
2119                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2120                    }
2121                };
2122                if m.command == "error" {
2123                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2124                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2125                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2126                }
2127                let response: AggregateResponse = prost::Message::decode(data.value.as_ref())
2128                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2129                Ok(response)
2130            }
2131            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2132        }
2133    }
2134    /// Count the number of documents in a collection, with an optional query
2135    #[tracing::instrument(skip_all)]
2136    pub async fn count(&self, mut config: CountRequest) -> Result<CountResponse, OpenIAPError> {
2137        if config.collectionname.is_empty() {
2138            config.collectionname = "entities".to_string();
2139        }
2140        if config.query.is_empty() {
2141            config.query = "{}".to_string();
2142        }
2143        let envelope = config.to_envelope();
2144        let result = self.send(envelope).await;
2145        match result {
2146            Ok(m) => {
2147                let data = match m.data {
2148                    Some(data) => data,
2149                    None => {
2150                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2151                    }
2152                };
2153                if m.command == "error" {
2154                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2155                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2156                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2157                }
2158                let response: CountResponse = prost::Message::decode(data.value.as_ref())
2159                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2160                Ok(response)
2161            }
2162            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2163        }
2164    }
2165    /// Get distinct values for a field in a collection, with an optional query
2166    #[tracing::instrument(skip_all)]
2167    pub async fn distinct(
2168        &self,
2169        mut config: DistinctRequest,
2170    ) -> Result<DistinctResponse, OpenIAPError> {
2171        if config.collectionname.is_empty() {
2172            config.collectionname = "entities".to_string();
2173        }
2174        if config.query.is_empty() {
2175            config.query = "{}".to_string();
2176        }
2177        if config.field.is_empty() {
2178            return Err(OpenIAPError::ClientError("No field provided".to_string()));
2179        }
2180        let envelope = config.to_envelope();
2181        let result = self.send(envelope).await;
2182        match result {
2183            Ok(m) => {
2184                let data = match m.data {
2185                    Some(data) => data,
2186                    None => {
2187                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2188                    }
2189                };
2190                if m.command == "error" {
2191                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2192                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2193                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2194                }
2195                let response: DistinctResponse = prost::Message::decode(data.value.as_ref())
2196                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2197                Ok(response)
2198            }
2199            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2200        }
2201    }
2202    /// Insert a document into a collection
2203    #[tracing::instrument(skip_all)]
2204    pub async fn insert_one(
2205        &self,
2206        config: InsertOneRequest,
2207    ) -> Result<InsertOneResponse, OpenIAPError> {
2208        let envelope = config.to_envelope();
2209        let result = self.send(envelope).await;
2210        match result {
2211            Ok(m) => {
2212                let data = match m.data {
2213                    Some(data) => data,
2214                    None => {
2215                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2216                    }
2217                };
2218                if m.command == "error" {
2219                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2220                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2221                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2222                }
2223                let response: InsertOneResponse = prost::Message::decode(data.value.as_ref())
2224                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2225                Ok(response)
2226            }
2227            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2228        }
2229    }
2230    /// Insert many documents into a collection
2231    #[tracing::instrument(skip_all)]
2232    pub async fn insert_many(
2233        &self,
2234        config: InsertManyRequest,
2235    ) -> Result<InsertManyResponse, OpenIAPError> {
2236        let envelope = config.to_envelope();
2237        let result = self.send(envelope).await;
2238        match result {
2239            Ok(m) => {
2240                let data = match m.data {
2241                    Some(data) => data,
2242                    None => {
2243                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2244                    }
2245                };
2246                if m.command == "error" {
2247                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2248                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2249                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2250                }
2251                let response: InsertManyResponse = prost::Message::decode(data.value.as_ref())
2252                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2253                Ok(response)
2254            }
2255            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2256        }
2257    }
2258    /// Update ( replace ) a document in a collection
2259    #[tracing::instrument(skip_all)]
2260    pub async fn update_one(
2261        &self,
2262        config: UpdateOneRequest,
2263    ) -> Result<UpdateOneResponse, OpenIAPError> {
2264        let envelope = config.to_envelope();
2265        let result = self.send(envelope).await;
2266        match result {
2267            Ok(m) => {
2268                let data = match m.data {
2269                    Some(data) => data,
2270                    None => {
2271                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2272                    }
2273                };
2274                if m.command == "error" {
2275                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2276                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2277                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2278                }
2279                let response: UpdateOneResponse = prost::Message::decode(data.value.as_ref())
2280                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2281                Ok(response)
2282            }
2283            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2284        }
2285    }
2286    /// Using a unique key, insert a document or update it if it already exists ( upsert on steroids )
2287    #[tracing::instrument(skip_all)]
2288    pub async fn insert_or_update_one(
2289        &self,
2290        config: InsertOrUpdateOneRequest,
2291    ) -> Result<String, OpenIAPError> {
2292        let envelope = config.to_envelope();
2293        let result = self.send(envelope).await;
2294        match result {
2295            Ok(m) => {
2296                let data = match m.data {
2297                    Some(data) => data,
2298                    None => {
2299                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2300                    }
2301                };
2302                if m.command == "error" {
2303                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2304                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2305                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2306                }
2307                let response: InsertOrUpdateOneResponse =
2308                    prost::Message::decode(data.value.as_ref())
2309                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2310                Ok(response.result)
2311            }
2312            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2313        }
2314    }
2315    /// Using a unique key, insert many documents or update them if they already exist ( upsert on steroids )
2316    #[tracing::instrument(skip_all)]
2317    pub async fn insert_or_update_many(
2318        &self,
2319        config: InsertOrUpdateManyRequest,
2320    ) -> Result<InsertOrUpdateManyResponse, OpenIAPError> {
2321        let envelope = config.to_envelope();
2322        let result = self.send(envelope).await;
2323        match result {
2324            Ok(m) => {
2325                let data = match m.data {
2326                    Some(data) => data,
2327                    None => {
2328                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2329                    }
2330                };
2331                if m.command == "error" {
2332                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2333                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2334                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2335                }
2336                let response: InsertOrUpdateManyResponse =
2337                    prost::Message::decode(data.value.as_ref())
2338                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2339                Ok(response)
2340            }
2341            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2342        }
2343    }
2344    /// Update one or more documents in a collection using a update document
2345    #[tracing::instrument(skip_all)]
2346    pub async fn update_document(
2347        &self,
2348        config: UpdateDocumentRequest,
2349    ) -> Result<UpdateDocumentResponse, OpenIAPError> {
2350        let envelope = config.to_envelope();
2351        let result = self.send(envelope).await;
2352        match result {
2353            Ok(m) => {
2354                let data = match m.data {
2355                    Some(data) => data,
2356                    None => {
2357                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2358                    }
2359                };
2360                if m.command == "error" {
2361                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2362                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2363                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2364                }
2365                let response: UpdateDocumentResponse = prost::Message::decode(data.value.as_ref())
2366                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2367                Ok(response)
2368            }
2369            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2370        }
2371    }
2372    /// Delete a document from a collection using a unique key
2373    #[tracing::instrument(skip_all)]
2374    pub async fn delete_one(&self, config: DeleteOneRequest) -> Result<i32, OpenIAPError> {
2375        let envelope = config.to_envelope();
2376        let result = self.send(envelope).await;
2377        match result {
2378            Ok(m) => {
2379                let data = match m.data {
2380                    Some(data) => data,
2381                    None => {
2382                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2383                    }
2384                };
2385                if m.command == "error" {
2386                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2387                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2388                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2389                }
2390                let response: DeleteOneResponse = prost::Message::decode(data.value.as_ref())
2391                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2392                Ok(response.affectedrows)
2393            }
2394            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2395        }
2396    }
2397    /// Delete many documents from a collection using a query or list of unique keys
2398    #[tracing::instrument(skip_all)]
2399    pub async fn delete_many(&self, config: DeleteManyRequest) -> Result<i32, OpenIAPError> {
2400        let envelope = config.to_envelope();
2401        let result = self.send(envelope).await;
2402        match result {
2403            Ok(m) => {
2404                let data = match m.data {
2405                    Some(data) => data,
2406                    None => {
2407                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2408                    }
2409                };
2410                if m.command == "error" {
2411                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2412                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2413                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2414                }
2415                let response: DeleteManyResponse = prost::Message::decode(data.value.as_ref())
2416                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2417                Ok(response.affectedrows)
2418            }
2419            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2420        }
2421    }
2422    /// Download a file from the database
2423    #[tracing::instrument(skip_all)]
2424    pub async fn download(
2425        &self,
2426        config: DownloadRequest,
2427        folder: Option<&str>,
2428        filename: Option<&str>,
2429    ) -> Result<DownloadResponse, OpenIAPError> {
2430        let envelope = config.to_envelope();
2431        match self.sendwithstream(envelope).await {
2432            Ok((response_rx, mut stream_rx)) => {
2433                let temp_file_path = util::generate_unique_filename("openiap");
2434                debug!("Temp file: {:?}", temp_file_path);
2435                let mut temp_file = File::create(&temp_file_path).map_err(|e| {
2436                    OpenIAPError::ClientError(format!("Failed to create temp file: {}", e))
2437                })?;
2438                while !stream_rx.is_closed() {
2439                    match stream_rx.recv().await {
2440                        Some(received) => {
2441                            if received.is_empty() {
2442                                debug!("Stream closed");
2443                                break;
2444                            }
2445                            debug!("Received {} bytes", received.len());
2446                            temp_file.write_all(&received).map_err(|e| {
2447                                OpenIAPError::ClientError(format!(
2448                                    "Failed to write to temp file: {}",
2449                                    e
2450                                ))
2451                            })?;
2452                        }
2453                        None => {
2454                            debug!("Stream closed");
2455                            break;
2456                        }
2457                    }
2458                }
2459                temp_file.sync_all().map_err(|e| {
2460                    OpenIAPError::ClientError(format!("Failed to sync temp file: {}", e))
2461                })?;
2462
2463                let response = response_rx.await.map_err(|_| {
2464                    OpenIAPError::ClientError("Failed to receive response".to_string())
2465                })?;
2466
2467                if response.command == "error" {
2468                    let data = match response.data {
2469                        Some(data) => data,
2470                        None => {
2471                            return Err(OpenIAPError::ClientError(
2472                                "No data returned for SERVER error".to_string(),
2473                            ));
2474                        }
2475                    };
2476                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref()).unwrap();
2477                    return Err(OpenIAPError::ServerError(e.message));
2478                }
2479                let mut downloadresponse: DownloadResponse =
2480                    prost::Message::decode(response.data.unwrap().value.as_ref()).unwrap();
2481
2482                let mut final_filename = match &filename {
2483                    Some(f) => f,
2484                    None => downloadresponse.filename.as_str(),
2485                };
2486                if final_filename.is_empty() {
2487                    final_filename = downloadresponse.filename.as_str();
2488                }
2489                let mut folder = match &folder {
2490                    Some(f) => f,
2491                    None => ".",
2492                };
2493                if folder.is_empty() {
2494                    folder = ".";
2495                }
2496                let filepath = format!("{}/{}", folder, final_filename);
2497                trace!("Moving file to {}", filepath);
2498                util::move_file(temp_file_path.to_str().unwrap(), filepath.as_str()).map_err(|e| {
2499                    OpenIAPError::ClientError(format!("Failed to move file: {}", e))
2500                })?;
2501                debug!("Downloaded file to {}", filepath);
2502                downloadresponse.filename = filepath;
2503
2504                Ok(downloadresponse)
2505            }
2506            Err(status) => Err(OpenIAPError::ClientError(status.to_string())),
2507        }
2508    }
2509    /// Upload a file to the database
2510    #[tracing::instrument(skip_all)]
2511    pub async fn upload(
2512        &self,
2513        config: UploadRequest,
2514        filepath: &str,
2515    ) -> Result<UploadResponse, OpenIAPError> {
2516        // debug!("upload: Uploading file: {}", filepath);
2517        // let mut file = File::open(filepath)
2518        //     .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2519        // let chunk_size = 1024 * 1024;
2520        // let mut buffer = vec![0; chunk_size];
2521
2522        // let envelope = config.to_envelope();
2523        // let (response_rx, rid) = self.send_noawait(envelope).await?;
2524        // {
2525        //     let envelope = BeginStream::from_rid(rid.clone());
2526        //     debug!("Sending beginstream to #{}", rid);
2527        //     self.send_envelope(envelope).await.map_err(|e| OpenIAPError::ClientError(format!("Failed to send data: {}", e)))?;
2528        //     let mut counter = 0;
2529
2530        //     loop {
2531        //         let bytes_read = file.read(&mut buffer).map_err(|e| {
2532        //             OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2533        //         })?;
2534        //         counter += 1;
2535
2536        //         if bytes_read == 0 {
2537        //             break;
2538        //         }
2539
2540        //         let chunk = buffer[..bytes_read].to_vec();
2541        //         let envelope = Stream::from_rid(chunk, rid.clone());
2542        //         debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2543        //         self.send_envelope(envelope).await.map_err(|e| {
2544        //             OpenIAPError::ClientError(format!("Failed to send data: {}", e))
2545        //         })?
2546        //     }
2547
2548        //     let envelope = EndStream::from_rid(rid.clone());
2549        //     debug!("Sending endstream to #{}", rid);
2550        //     self.send_envelope(envelope).await
2551        //         .map_err(|e| OpenIAPError::ClientError(format!("Failed to send data: {}", e)))?;
2552        // }
2553
2554        // debug!("Wait for upload response for #{}", rid);
2555        // match response_rx.await {
2556        //     Ok(response) => {
2557        //         if response.command == "error" {
2558        //             let error_response: ErrorResponse = prost::Message::decode(
2559        //                 response.data.unwrap().value.as_ref(),
2560        //             )
2561        //             .map_err(|e| {
2562        //                 OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2563        //             })?;
2564        //             return Err(OpenIAPError::ServerError(error_response.message));
2565        //         }
2566        //         let upload_response: UploadResponse =
2567        //             prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2568        //                 OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2569        //             })?;
2570        //         Ok(upload_response)
2571        //     }
2572        //     Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2573        // }
2574        debug!("upload: Uploading file: {}", filepath);
2575        let mut file = File::open(filepath)
2576            .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2577        let chunk_size = 1024 * 1024;
2578        let mut buffer = vec![0; chunk_size];
2579    
2580        // Send the initial upload request
2581        let envelope = config.to_envelope();
2582        let (response_rx, rid) = self.send_noawait(envelope).await?;
2583        
2584        // Send the BeginStream message
2585        let envelope = BeginStream::from_rid(rid.clone());
2586        debug!("Sending beginstream to #{}", rid);
2587        if let Err(e) = self.send_envelope(envelope).await {
2588            let inner = self.inner.lock().await;
2589            inner.queries.lock().await.remove(&rid);
2590            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2591        }
2592    
2593        // Send file chunks
2594        let mut counter = 0;
2595        loop {
2596            let bytes_read = file.read(&mut buffer).map_err(|e| {
2597                OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2598            })?;
2599            counter += 1;
2600    
2601            if bytes_read == 0 {
2602                break;
2603            }
2604    
2605            let chunk = buffer[..bytes_read].to_vec();
2606            let envelope = Stream::from_rid(chunk, rid.clone());
2607            debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2608            if let Err(e) = self.send_envelope(envelope).await {
2609                let inner = self.inner.lock().await;
2610                inner.queries.lock().await.remove(&rid);
2611                return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2612            }
2613        }
2614    
2615        // Send the EndStream message
2616        let envelope = EndStream::from_rid(rid.clone());
2617        debug!("Sending endstream to #{}", rid);
2618        if let Err(e) = self.send_envelope(envelope).await {
2619            let inner = self.inner.lock().await;
2620            inner.queries.lock().await.remove(&rid);
2621            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2622        }
2623    
2624        // Await the response and clean up `inner.queries` afterward
2625        debug!("Wait for upload response for #{}", rid);
2626        let result = response_rx.await;
2627        let inner = self.inner.lock().await;
2628        inner.queries.lock().await.remove(&rid);
2629    
2630        match result {
2631            Ok(response) => {
2632                if response.command == "error" {
2633                    let error_response: ErrorResponse = prost::Message::decode(
2634                        response.data.unwrap().value.as_ref(),
2635                    )
2636                    .map_err(|e| {
2637                        OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2638                    })?;
2639                    return Err(OpenIAPError::ServerError(error_response.message));
2640                }
2641                let upload_response: UploadResponse =
2642                    prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2643                        OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2644                    })?;
2645                Ok(upload_response)
2646            }
2647            Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2648        }
2649    }
2650    /// Watch for changes in a collection ( change stream )
2651    #[tracing::instrument(skip_all)]
2652    pub async fn watch(
2653        &self,
2654        mut config: WatchRequest,
2655        callback: Box<dyn Fn(WatchEvent) + Send + Sync>,
2656    ) -> Result<String, OpenIAPError> {
2657        if config.collectionname.is_empty() {
2658            config.collectionname = "entities".to_string();
2659        }
2660        if config.paths.is_empty() {
2661            config.paths = vec!["".to_string()];
2662        }
2663
2664        let envelope = config.to_envelope();
2665        let result = self.send(envelope).await;
2666        match result {
2667            Ok(m) => {
2668                let data = match m.data {
2669                    Some(data) => data,
2670                    None => {
2671                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2672                    }
2673                };
2674                if m.command == "error" {
2675                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2676                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2677                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2678                }
2679                let response: WatchResponse = prost::Message::decode(data.value.as_ref())
2680                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2681
2682                let inner = self.inner.lock().await;
2683                inner
2684                    .watches
2685                    .lock()
2686                    .await
2687                    .insert(response.id.clone(), callback);
2688
2689                Ok(response.id)
2690            }
2691            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2692        }
2693    }
2694    /// Cancel a watch ( change stream )
2695    #[tracing::instrument(skip_all)]
2696    pub async fn unwatch(&self, id: &str) -> Result<(), OpenIAPError> {
2697        let config = UnWatchRequest::byid(id);
2698        let envelope = config.to_envelope();
2699        let result = self.send(envelope).await;
2700        match result {
2701            Ok(m) => {
2702                let data = match m.data {
2703                    Some(data) => data,
2704                    None => {
2705                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2706                    }
2707                };
2708                if m.command == "error" {
2709                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2710                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2711                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2712                }
2713                Ok(())
2714            }
2715            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2716        }
2717    }
2718    /// Register a queue for messaging ( amqp ) in the OpenIAP service
2719    #[tracing::instrument(skip_all)]
2720    pub async fn register_queue(
2721        &self,
2722        mut config: RegisterQueueRequest,
2723        callback: Box<dyn Fn(QueueEvent) + Send + Sync>,
2724    ) -> Result<String, OpenIAPError> {
2725        if config.queuename.is_empty() {
2726            config.queuename = "".to_string();
2727        }
2728
2729        let envelope = config.to_envelope();
2730        let result = self.send(envelope).await;
2731        match result {
2732            Ok(m) => {
2733                let data = match m.data {
2734                    Some(data) => data,
2735                    None => {
2736                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2737                    }
2738                };
2739                if m.command == "error" {
2740                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2741                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2742                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2743                }
2744                let response: RegisterQueueResponse =
2745                    prost::Message::decode(data.value.as_ref())
2746                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2747
2748                let inner = self.inner.lock().await;
2749                inner
2750                    .queues
2751                    .lock()
2752                    .await
2753                    .insert(response.queuename.clone(), callback);
2754
2755                Ok(response.queuename)
2756            }
2757            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2758        }
2759    }
2760    /// Unregister a queue or exchange for messaging ( amqp ) in the OpenIAP service
2761    #[tracing::instrument(skip_all)]
2762    pub async fn unregister_queue(&self, queuename: &str) -> Result<(), OpenIAPError> {
2763        let config = UnRegisterQueueRequest::byqueuename(queuename);
2764        let envelope = config.to_envelope();
2765        let result = self.send(envelope).await;
2766        match result {
2767            Ok(m) => {
2768                let data = match m.data {
2769                    Some(data) => data,
2770                    None => {
2771                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2772                    }
2773                };
2774                if m.command == "error" {
2775                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2776                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2777                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2778                }
2779                Ok(())
2780            }
2781            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2782        }
2783    }
2784    /// Register a exchange for messaging ( amqp ) in the OpenIAP service
2785    #[tracing::instrument(skip_all)]
2786    pub async fn register_exchange(
2787        &self,
2788        mut config: RegisterExchangeRequest,
2789        callback: Box<dyn Fn(QueueEvent) + Send + Sync>,
2790    ) -> Result<String, OpenIAPError> {
2791        if config.exchangename.is_empty() {
2792            return Err(OpenIAPError::ClientError(
2793                "No exchange name provided".to_string(),
2794            ));
2795        }
2796        if config.algorithm.is_empty() {
2797            config.algorithm = "fanout".to_string();
2798        }
2799        let envelope = config.to_envelope();
2800        let result = self.send(envelope).await;
2801        match result {
2802            Ok(m) => {
2803                let data = match m.data {
2804                    Some(data) => data,
2805                    None => {
2806                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2807                    }
2808                };
2809                if m.command == "error" {
2810                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2811                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2812                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2813                }
2814                let response: RegisterExchangeResponse =
2815                    prost::Message::decode(data.value.as_ref())
2816                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2817                if !response.queuename.is_empty() {
2818                    let inner = self.inner.lock().await;
2819                    inner
2820                        .queues
2821                        .lock()
2822                        .await
2823                        .insert(response.queuename.clone(), callback);
2824                }
2825                Ok(response.queuename)
2826            }
2827            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2828        }
2829    }
2830    /// Send a message to a queue or exchange in the OpenIAP service
2831    #[tracing::instrument(skip_all)]
2832    pub async fn queue_message(
2833        &self,
2834        config: QueueMessageRequest,
2835    ) -> Result<QueueMessageResponse, OpenIAPError> {
2836        if config.queuename.is_empty() && config.exchangename.is_empty() {
2837            return Err(OpenIAPError::ClientError(
2838                "No queue or exchange name provided".to_string(),
2839            ));
2840        }
2841        let envelope = config.to_envelope();
2842        let result = self.send(envelope).await;
2843        match result {
2844            Ok(m) => {
2845                let data = match m.data {
2846                    Some(d) => d,
2847                    None => {
2848                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
2849                    }
2850                };
2851                if m.command == "error" {
2852                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2853                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2854                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2855                }
2856                let response: QueueMessageResponse = prost::Message::decode(data.value.as_ref())
2857                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2858                Ok(response)
2859            }
2860            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2861        }
2862    }
2863    /// Send message to a queue or exchange in the OpenIAP service, and wait for a reply
2864    #[tracing::instrument(skip_all)]
2865    pub async fn rpc(&self, mut config: QueueMessageRequest) -> Result<String, OpenIAPError> {
2866        if config.queuename.is_empty() && config.exchangename.is_empty() {
2867            return Err(OpenIAPError::ClientError(
2868                "No queue or exchange name provided".to_string(),
2869            ));
2870        }
2871
2872        let (tx, rx) = oneshot::channel::<String>();
2873        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
2874
2875        let q = self
2876            .register_queue(
2877                RegisterQueueRequest {
2878                    queuename: "".to_string(),
2879                },
2880                Box::new(move |event| {
2881                    let tx = tx.lock().unwrap().take().unwrap();
2882                    tx.send(event.data).unwrap();
2883                }),
2884            )
2885            .await
2886            .unwrap();
2887
2888        config.replyto = q.clone();
2889        let envelope = config.to_envelope();
2890
2891        let result = self.send(envelope).await;
2892        match result {
2893            Ok(m) => {
2894                let data = match m.data {
2895                    Some(d) => d,
2896                    None => {
2897                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
2898                    }
2899                };
2900                if m.command == "error" {
2901                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2902                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2903                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2904                }
2905                // prost::Message::decode(data.value.as_ref())
2906                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2907
2908                let response = rx.await.unwrap();
2909
2910                let ur_response = self.unregister_queue(&q).await;
2911                match ur_response {
2912                    Ok(_) => {
2913                        debug!("Unregistered Response Queue: {:?}", q);
2914                    }
2915                    Err(e) => {
2916                        error!("Failed to unregister Response Queue: {:?}", e);
2917                    }
2918                }
2919
2920                Ok(response)
2921            }
2922            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2923        }
2924    }
2925    /// Push a new workitem to a workitem queue
2926    /// If the file is less than 5 megabytes it will be attached to the workitem
2927    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
2928    #[tracing::instrument(skip_all)]
2929    pub async fn push_workitem(
2930        &self,
2931        mut config: PushWorkitemRequest,
2932    ) -> Result<PushWorkitemResponse, OpenIAPError> {
2933        if config.wiq.is_empty() && config.wiqid.is_empty() {
2934            return Err(OpenIAPError::ClientError(
2935                "No queue name or id provided".to_string(),
2936            ));
2937        }
2938        for f in &mut config.files {
2939            if f.filename.is_empty() && f.file.is_empty() {
2940                debug!("Filename is empty");
2941            } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
2942                // does file exist?
2943                if !std::path::Path::new(&f.filename).exists() {
2944                    debug!("File does not exist: {}", f.filename);
2945                } else {
2946                    let filesize = std::fs::metadata(&f.filename).unwrap().len();
2947                    // if filesize is less than 5 meggabytes attach it, else upload
2948                    if filesize < 5 * 1024 * 1024 {
2949                        debug!("File {} exists so ATTACHING it.", f.filename);
2950                        let filename = std::path::Path::new(&f.filename)
2951                            .file_name()
2952                            .unwrap()
2953                            .to_str()
2954                            .unwrap();
2955                        f.file = std::fs::read(&f.filename).unwrap();
2956                        // f.file = compress_file(&f.filename).unwrap();
2957                        // f.compressed = false;
2958                        f.file = util::compress_file_to_vec(&f.filename).unwrap();
2959                        f.compressed = true;
2960                        f.filename = filename.to_string();
2961                        f.id = "findme".to_string();
2962                        trace!(
2963                            "File {} was read and assigned to f.file, size: {}",
2964                            f.filename,
2965                            f.file.len()
2966                        );
2967                    } else {
2968                        debug!("File {} exists so UPLOADING it.", f.filename);
2969                        let filename = std::path::Path::new(&f.filename)
2970                            .file_name()
2971                            .unwrap()
2972                            .to_str()
2973                            .unwrap();
2974                        let uploadconfig = UploadRequest {
2975                            filename: filename.to_string(),
2976                            collectionname: "fs.files".to_string(),
2977                            ..Default::default()
2978                        };
2979                        let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
2980                        trace!("File {} was upload as {}", filename, uploadresult.id);
2981                        // f.filename = "".to_string();
2982                        f.id = uploadresult.id.clone();
2983                        f.filename = filename.to_string();
2984                    }
2985                }
2986            } else {
2987                debug!("File {} is already uploaded", f.filename);
2988            }
2989        }
2990        let envelope = config.to_envelope();
2991        let result = self.send(envelope).await;
2992        match result {
2993            Ok(m) => {
2994                let data = match m.data {
2995                    Some(data) => data,
2996                    None => {
2997                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2998                    }
2999                };
3000                if m.command == "error" {
3001                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3002                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3003                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3004                }
3005                let response: PushWorkitemResponse = prost::Message::decode(data.value.as_ref())
3006                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3007                Ok(response)
3008            }
3009            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3010        }
3011    }
3012    /// Push multiple workitems to a workitem queue
3013    /// If the file is less than 5 megabytes it will be attached to the workitem
3014    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3015    #[tracing::instrument(skip_all)]
3016    pub async fn push_workitems(
3017        &self,
3018        mut config: PushWorkitemsRequest,
3019    ) -> Result<PushWorkitemsResponse, OpenIAPError> {
3020        if config.wiq.is_empty() && config.wiqid.is_empty() {
3021            return Err(OpenIAPError::ClientError(
3022                "No queue name or id provided".to_string(),
3023            ));
3024        }
3025        for wi in &mut config.items {
3026            for f in &mut wi.files {
3027                if f.filename.is_empty() && f.file.is_empty() {
3028                    debug!("Filename is empty");
3029                } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3030                    // does file exist?
3031                    if !std::path::Path::new(&f.filename).exists() {
3032                        debug!("File does not exist: {}", f.filename);
3033                    } else {
3034                        let filesize = std::fs::metadata(&f.filename).unwrap().len();
3035                        // if filesize is less than 5 meggabytes attach it, else upload
3036                        if filesize < 5 * 1024 * 1024 {
3037                            debug!("File {} exists so ATTACHING it.", f.filename);
3038                            let filename = std::path::Path::new(&f.filename)
3039                                .file_name()
3040                                .unwrap()
3041                                .to_str()
3042                                .unwrap();
3043                            f.file = std::fs::read(&f.filename).unwrap();
3044                            // f.file = compress_file(&f.filename).unwrap();
3045                            // f.compressed = false;
3046                            f.file = util::compress_file_to_vec(&f.filename).unwrap();
3047                            f.compressed = true;
3048                            f.filename = filename.to_string();
3049                            f.id = "findme".to_string();
3050                            trace!(
3051                                "File {} was read and assigned to f.file, size: {}",
3052                                f.filename,
3053                                f.file.len()
3054                            );
3055                        } else {
3056                            debug!("File {} exists so UPLOADING it.", f.filename);
3057                            let filename = std::path::Path::new(&f.filename)
3058                                .file_name()
3059                                .unwrap()
3060                                .to_str()
3061                                .unwrap();
3062                            let uploadconfig = UploadRequest {
3063                                filename: filename.to_string(),
3064                                collectionname: "fs.files".to_string(),
3065                                ..Default::default()
3066                            };
3067                            let uploadresult =
3068                                self.upload(uploadconfig, &f.filename).await.unwrap();
3069                            trace!("File {} was upload as {}", filename, uploadresult.id);
3070                            // f.filename = "".to_string();
3071                            f.id = uploadresult.id.clone();
3072                            f.filename = filename.to_string();
3073                        }
3074                    }
3075                } else {
3076                    debug!("File {} is already uploaded", f.filename);
3077                }
3078            }
3079        }
3080        let envelope = config.to_envelope();
3081        let result = self.send(envelope).await;
3082        match result {
3083            Ok(m) => {
3084                let data = match m.data {
3085                    Some(data) => data,
3086                    None => {
3087                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3088                    }
3089                };
3090                if m.command == "error" {
3091                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3092                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3093                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3094                }
3095                let response: PushWorkitemsResponse =
3096                    prost::Message::decode(data.value.as_ref())
3097                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3098                Ok(response)
3099            }
3100            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3101        }
3102    }
3103    /// Pop a workitem from a workitem queue, return None if no workitem is available
3104    /// Any files attached to the workitem will be downloaded to the downloadfolder ( default "." )
3105    #[tracing::instrument(skip_all)]
3106    pub async fn pop_workitem(
3107        &self,
3108        config: PopWorkitemRequest,
3109        downloadfolder: Option<&str>,
3110    ) -> Result<PopWorkitemResponse, OpenIAPError> {
3111        if config.wiq.is_empty() && config.wiqid.is_empty() {
3112            return Err(OpenIAPError::ClientError(
3113                "No queue name or id provided".to_string(),
3114            ));
3115        }
3116        let envelope = config.to_envelope();
3117        let result = self.send(envelope).await;
3118        match result {
3119            Ok(m) => {
3120                let data = match m.data {
3121                    Some(data) => data,
3122                    None => {
3123                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3124                    }
3125                };
3126                if m.command == "error" {
3127                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3128                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3129                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3130                }
3131                let response: PopWorkitemResponse = prost::Message::decode(data.value.as_ref())
3132                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3133
3134                match &response.workitem {
3135                    Some(wi) => {
3136                        for f in &wi.files {
3137                            if !f.id.is_empty() {
3138                                let downloadconfig = DownloadRequest {
3139                                    id: f.id.clone(),
3140                                    collectionname: "fs.files".to_string(),
3141                                    ..Default::default()
3142                                };
3143                                let downloadresult =
3144                                    match self.download(downloadconfig, downloadfolder, None).await
3145                                    {
3146                                        Ok(r) => r,
3147                                        Err(e) => {
3148                                            debug!("Failed to download file: {}", e);
3149                                            continue;
3150                                        }
3151                                    };
3152                                debug!(
3153                                    "File {} was downloaded as {}",
3154                                    f.filename, downloadresult.filename
3155                                );
3156                            }
3157                        }
3158                    }
3159                    None => {
3160                        debug!("No workitem found");
3161                    }
3162                }
3163                Ok(response)
3164            }
3165            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3166        }
3167    }
3168    /// Update a workitem in a workitem queue
3169    /// If the file is less than 5 megabytes it will be attached to the workitem
3170    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3171    /// If a fileid is provided it will be used to update the file
3172    /// if a filename is provided without the id, it will be deleted
3173    #[tracing::instrument(skip_all)]
3174    pub async fn update_workitem(
3175        &self,
3176        mut config: UpdateWorkitemRequest,
3177    ) -> Result<UpdateWorkitemResponse, OpenIAPError> {
3178        match &config.workitem {
3179            Some(wiq) => {
3180                if wiq.id.is_empty() {
3181                    return Err(OpenIAPError::ClientError(
3182                        "No workitem id provided".to_string(),
3183                    ));
3184                }
3185            }
3186            None => {
3187                return Err(OpenIAPError::ClientError(
3188                    "No workitem provided".to_string(),
3189                ));
3190            }
3191        }
3192        for f in &mut config.files {
3193            if f.filename.is_empty() && f.file.is_empty() {
3194                debug!("Filename is empty");
3195            } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3196                if !std::path::Path::new(&f.filename).exists() {
3197                    debug!("File does not exist: {}", f.filename);
3198                } else {
3199                    debug!("File {} exists so uploading it.", f.filename);
3200                    let filename = std::path::Path::new(&f.filename)
3201                        .file_name()
3202                        .unwrap()
3203                        .to_str()
3204                        .unwrap();
3205                    let uploadconfig = UploadRequest {
3206                        filename: filename.to_string(),
3207                        collectionname: "fs.files".to_string(),
3208                        ..Default::default()
3209                    };
3210                    let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3211                    trace!("File {} was upload as {}", filename, uploadresult.id);
3212                    f.id = uploadresult.id.clone();
3213                    f.filename = filename.to_string();
3214                }
3215            } else {
3216                debug!("Skipped file");
3217            }
3218        }
3219        let envelope = config.to_envelope();
3220        let result = self.send(envelope).await;
3221        match result {
3222            Ok(m) => {
3223                let data = match m.data {
3224                    Some(d) => d,
3225                    None => {
3226                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3227                    }
3228                };
3229                if m.command == "error" {
3230                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3231                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3232                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3233                }
3234                let response: UpdateWorkitemResponse = prost::Message::decode(data.value.as_ref())
3235                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3236                Ok(response)
3237            }
3238            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3239        }
3240    }
3241    /// Delete a workitem from a workitem queue
3242    #[tracing::instrument(skip_all)]
3243    pub async fn delete_workitem(
3244        &self,
3245        config: DeleteWorkitemRequest,
3246    ) -> Result<DeleteWorkitemResponse, OpenIAPError> {
3247        if config.id.is_empty() {
3248            return Err(OpenIAPError::ClientError(
3249                "No workitem id provided".to_string(),
3250            ));
3251        }
3252        let envelope = config.to_envelope();
3253        let result = self.send(envelope).await;
3254        match result {
3255            Ok(m) => {
3256                let data = match m.data {
3257                    Some(d) => d,
3258                    None => {
3259                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3260                    }
3261                };
3262                if m.command == "error" {
3263                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3264                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3265                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3266                }
3267                let response: DeleteWorkitemResponse = prost::Message::decode(data.value.as_ref())
3268                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3269                Ok(response)
3270            }
3271            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3272        }
3273    }
3274    /// Add a workitem queue to openiap instance
3275    #[tracing::instrument(skip_all)]
3276    pub async fn add_workitem_queue(
3277        &self,
3278        config: AddWorkItemQueueRequest,
3279    ) -> Result<WorkItemQueue, OpenIAPError> {
3280        if config.workitemqueue.is_none() {
3281            return Err(OpenIAPError::ClientError(
3282                "No workitem queue name provided".to_string(),
3283            ));
3284        }
3285        let envelope = config.to_envelope();
3286        let result = self.send(envelope).await;
3287        match result {
3288            Ok(m) => {
3289                let data = match m.data {
3290                    Some(d) => d,
3291                    None => {
3292                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3293                    }
3294                };
3295                if m.command == "error" {
3296                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3297                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3298                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3299                }
3300                let response: AddWorkItemQueueResponse =
3301                    prost::Message::decode(data.value.as_ref())
3302                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3303                match response.workitemqueue {
3304                    Some(wiq) => Ok(wiq),
3305                    None => {
3306                        return Err(OpenIAPError::ClientError(
3307                            "No workitem queue returned".to_string(),
3308                        ));
3309                    }
3310                }
3311            }
3312            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3313        }
3314    }
3315    /// Update a workitem queue in openiap instance
3316    #[tracing::instrument(skip_all)]
3317    pub async fn update_workitem_queue(
3318        &self,
3319        config: UpdateWorkItemQueueRequest,
3320    ) -> Result<WorkItemQueue, OpenIAPError> {
3321        if config.workitemqueue.is_none() {
3322            return Err(OpenIAPError::ClientError(
3323                "No workitem queue name provided".to_string(),
3324            ));
3325        }
3326        let envelope = config.to_envelope();
3327        let result = self.send(envelope).await;
3328        match result {
3329            Ok(m) => {
3330                let data = match m.data {
3331                    Some(d) => d,
3332                    None => {
3333                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3334                    }
3335                };
3336                if m.command == "error" {
3337                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3338                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3339                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3340                }
3341                let response: UpdateWorkItemQueueResponse =
3342                    prost::Message::decode(data.value.as_ref())
3343                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3344                match response.workitemqueue {
3345                    Some(wiq) => Ok(wiq),
3346                    None => {
3347                        return Err(OpenIAPError::ClientError(
3348                            "No workitem queue returned".to_string(),
3349                        ));
3350                    }
3351                }
3352            }
3353            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3354        }
3355    }
3356    /// Delete a workitem queue from openiap instance
3357    #[tracing::instrument(skip_all)]
3358    pub async fn delete_workitem_queue(
3359        &self,
3360        config: DeleteWorkItemQueueRequest,
3361    ) -> Result<(), OpenIAPError> {
3362        if config.wiq.is_empty() && config.wiqid.is_empty() {
3363            return Err(OpenIAPError::ClientError(
3364                "No workitem queue name or id provided".to_string(),
3365            ));
3366        }
3367        let envelope = config.to_envelope();
3368        let result = self.send(envelope).await;
3369        match result {
3370            Ok(m) => {
3371                let data = match m.data {
3372                    Some(d) => d,
3373                    None => {
3374                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3375                    }
3376                };
3377                if m.command == "error" {
3378                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3379                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3380                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3381                }
3382                Ok(())
3383            }
3384            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3385        }
3386    }
3387    /// Run custom command on server. Custom commands are commands who is "on trail", they may change and are not ready to be moved to the fixed protobuf format yet
3388    #[tracing::instrument(skip_all)]
3389    pub async fn custom_command(
3390        &self,
3391        config: CustomCommandRequest,
3392    ) -> Result<String, OpenIAPError> {
3393        if config.command.is_empty() {
3394            return Err(OpenIAPError::ClientError("No command provided".to_string()));
3395        }
3396        let envelope = config.to_envelope();
3397        let result = self.send(envelope).await;
3398        match result {
3399            Ok(m) => {
3400                let data = match m.data {
3401                    Some(d) => d,
3402                    None => {
3403                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3404                    }
3405                };
3406                if m.command == "error" {
3407                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3408                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3409                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3410                }
3411                let response: CustomCommandResponse =
3412                    prost::Message::decode(data.value.as_ref())
3413                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3414                Ok(response.result)
3415            }
3416            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3417        }
3418    }
3419    /// Delete a package from the database, cleaning up all all files and data
3420    #[tracing::instrument(skip_all)]
3421    pub async fn delete_package(&self, packageid: &str) -> Result<(), OpenIAPError> {
3422        let config = DeletePackageRequest::byid(packageid);
3423        let envelope = config.to_envelope();
3424        let result = self.send(envelope).await;
3425        match result {
3426            Ok(m) => {
3427                let data = match m.data {
3428                    Some(data) => data,
3429                    None => {
3430                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3431                    }
3432                };
3433                if m.command == "error" {
3434                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3435                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3436                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3437                }
3438                // prost::Message::decode(data.value.as_ref())
3439                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3440                Ok(())
3441            }
3442            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3443        }
3444    }
3445    /// Start Agent
3446    #[tracing::instrument(skip_all)]
3447    pub async fn start_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3448        let config = StartAgentRequest::byid(agentid);
3449        let envelope = config.to_envelope();
3450        let result = self.send(envelope).await;
3451        match result {
3452            Ok(m) => {
3453                let data = match m.data {
3454                    Some(d) => d,
3455                    None => {
3456                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3457                    }
3458                };
3459                if m.command == "error" {
3460                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3461                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3462                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3463                }
3464                // prost::Message::decode(data.value.as_ref())
3465                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3466                Ok(())
3467            }
3468            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3469        }
3470    }
3471    /// Stop an agent, this will cleanup all resources and stop the agent
3472    #[tracing::instrument(skip_all)]
3473    pub async fn stop_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3474        let config = StopAgentRequest::byid(agentid);
3475        let envelope = config.to_envelope();
3476        let result = self.send(envelope).await;
3477        match result {
3478            Ok(m) => {
3479                let data = match m.data {
3480                    Some(d) => d,
3481                    None => {
3482                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3483                    }
3484                };
3485                if m.command == "error" {
3486                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3487                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3488                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3489                }
3490                // prost::Message::decode(data.value.as_ref())
3491                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3492                Ok(())
3493            }
3494            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3495        }
3496    }
3497    /// Delete a pod from an agent, on kubernetes this will remove the pod and kubernetes will re-create it, on docker this will remove the pod. Then use start_agent to start the agent again
3498    #[tracing::instrument(skip_all)]
3499    pub async fn delete_agent_pod(&self, agentid: &str, podname: &str) -> Result<(), OpenIAPError> {
3500        let config = DeleteAgentPodRequest::byid(agentid, podname);
3501        let envelope = config.to_envelope();
3502        let result = self.send(envelope).await;
3503        match result {
3504            Ok(m) => {
3505                let data = match m.data {
3506                    Some(d) => d,
3507                    None => {
3508                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3509                    }
3510                };
3511                if m.command == "error" {
3512                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3513                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3514                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3515                }
3516                // prost::Message::decode(data.value.as_ref())
3517                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3518                Ok(())
3519            }
3520            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3521        }
3522    }
3523    /// Delete an agent, this will cleanup all resources and delete the agent
3524    #[tracing::instrument(skip_all)]
3525    pub async fn delete_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3526        let config = DeleteAgentRequest::byid(agentid);
3527        let envelope = config.to_envelope();
3528        let result = self.send(envelope).await;
3529        match result {
3530            Ok(m) => {
3531                let data = match m.data {
3532                    Some(d) => d,
3533                    None => {
3534                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3535                    }
3536                };
3537                if m.command == "error" {
3538                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3539                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3540                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3541                }
3542                // prost::Message::decode(data.value.as_ref())
3543                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3544                Ok(())
3545            }
3546            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3547        }
3548    }
3549    /// Get all pods associated with an agent, if stats is true, it will return memory and cpu usage for each pod
3550    #[tracing::instrument(skip_all)]
3551    pub async fn get_agent_pods(&self, agentid: &str, stats: bool) -> Result<String, OpenIAPError> {
3552        let config = GetAgentPodsRequest::byid(agentid, stats);
3553        let envelope = config.to_envelope();
3554        let result = self.send(envelope).await;
3555        match result {
3556            Ok(m) => {
3557                let data = match m.data {
3558                    Some(d) => d,
3559                    None => {
3560                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3561                    }
3562                };
3563                if m.command == "error" {
3564                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3565                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3566                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3567                }
3568                let response: GetAgentPodsResponse = prost::Message::decode(data.value.as_ref())
3569                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3570                Ok(response.results)
3571            }
3572            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3573        }
3574    }
3575    /// Get logs from a pod associated with an agent, leave podname empty to get logs from all pods
3576    #[tracing::instrument(skip_all)]
3577    pub async fn get_agent_pod_logs(
3578        &self,
3579        agentid: &str,
3580        podname: &str,
3581    ) -> Result<String, OpenIAPError> {
3582        let config = GetAgentLogRequest::new(agentid, podname);
3583        let envelope = config.to_envelope();
3584        let result = self.send(envelope).await;
3585        match result {
3586            Ok(m) => {
3587                let data = match m.data {
3588                    Some(d) => d,
3589                    None => {
3590                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3591                    }
3592                };
3593                if m.command == "error" {
3594                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3595                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3596                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3597                }
3598                let response: GetAgentLogResponse = prost::Message::decode(data.value.as_ref())
3599                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3600                Ok(response.result)
3601            }
3602            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3603        }
3604    }
3605
3606    /// Create/update a customer in the OpenIAP service. If stripe has been configured, it will create or update a customer in stripe as well
3607    /// A customer is a customer object that can only be updated using this function, and 2 roles ( customername admins and customername users )
3608    #[tracing::instrument(skip_all)]
3609    pub async fn ensure_customer(
3610        &self,
3611        config: EnsureCustomerRequest,
3612    ) -> Result<EnsureCustomerResponse, OpenIAPError> {
3613        if config.customer.is_none() && config.stripe.is_none() {
3614            return Err(OpenIAPError::ClientError(
3615                "No customer or stripe provided".to_string(),
3616            ));
3617        }
3618        let envelope = config.to_envelope();
3619        let result = self.send(envelope).await;
3620        match result {
3621            Ok(m) => {
3622                let data = match m.data {
3623                    Some(d) => d,
3624                    None => {
3625                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3626                    }
3627                };
3628                if m.command == "error" {
3629                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3630                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3631                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3632                }
3633                let response: EnsureCustomerResponse = prost::Message::decode(data.value.as_ref())
3634                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3635                Ok(response)
3636            }
3637            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3638        }
3639    }
3640    /// Create a new workflow instance, to be used to workflow in/out nodes in NodeRED
3641    #[tracing::instrument(skip_all)]
3642    pub async fn create_workflow_instance(
3643        &self,
3644        config: CreateWorkflowInstanceRequest,
3645    ) -> Result<String, OpenIAPError> {
3646        if config.workflowid.is_empty() {
3647            return Err(OpenIAPError::ClientError(
3648                "No workflow id provided".to_string(),
3649            ));
3650        }
3651        let envelope = config.to_envelope();
3652        let result = self.send(envelope).await;
3653        match result {
3654            Ok(m) => {
3655                let data = match m.data {
3656                    Some(d) => d,
3657                    None => {
3658                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3659                    }
3660                };
3661                if m.command == "error" {
3662                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3663                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3664                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3665                }
3666                let response: CreateWorkflowInstanceResponse =
3667                    prost::Message::decode(data.value.as_ref())
3668                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3669                Ok(response.instanceid)
3670            }
3671            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3672        }
3673    }
3674
3675    /// Invoke a workflow in the OpenRPA robot where robotid is the userid of the user the robot is running as, or a roleid with RPA enabled
3676    #[tracing::instrument(skip_all)]
3677    pub async fn invoke_openrpa(
3678        &self,
3679        config: InvokeOpenRpaRequest,
3680    ) -> Result<String, OpenIAPError> {
3681        if config.robotid.is_empty() {
3682            return Err(OpenIAPError::ClientError(
3683                "No robot id provided".to_string(),
3684            ));
3685        }
3686        if config.workflowid.is_empty() {
3687            return Err(OpenIAPError::ClientError(
3688                "No workflow id provided".to_string(),
3689            ));
3690        }
3691
3692        let (tx, rx) = oneshot::channel::<String>();
3693        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3694
3695        let q = self
3696            .register_queue(
3697                RegisterQueueRequest {
3698                    queuename: "".to_string(),
3699                },
3700                Box::new(move |event| {
3701                    let json = event.data.clone();
3702                    let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3703                    let command: String = obj["command"].as_str().unwrap().to_string();
3704                    debug!("Received event: {:?}", event);
3705                    if command.eq("invokesuccess") {
3706                        debug!("Robot successfully started running workflow");
3707                    } else if command.eq("invokeidle") {
3708                        debug!("Workflow went idle");
3709                    } else if command.eq("invokeerror") {
3710                        debug!("Robot failed to run workflow");
3711                        let tx = tx.lock().unwrap().take().unwrap();
3712                        tx.send(event.data).unwrap();
3713                    } else if command.eq("timeout") {
3714                        debug!("No robot picked up the workflow");
3715                        let tx = tx.lock().unwrap().take().unwrap();
3716                        tx.send(event.data).unwrap();
3717                    } else if command.eq("invokecompleted") {
3718                        debug!("Robot completed running workflow");
3719                        let tx = tx.lock().unwrap().take().unwrap();
3720                        tx.send(event.data).unwrap();
3721                    } else {
3722                        let tx = tx.lock().unwrap().take().unwrap();
3723                        tx.send(event.data).unwrap();
3724                    }
3725                }),
3726            )
3727            .await
3728            .unwrap();
3729        debug!("Registered Response Queue: {:?}", q);
3730        let data = format!(
3731            "{{\"command\":\"invoke\",\"workflowid\":\"{}\",\"payload\": {}}}",
3732            config.workflowid, config.payload
3733        );
3734        debug!("Send Data: {}", data);
3735        debug!("To Queue: {} With reply to: {}", config.robotid, q);
3736        let config = QueueMessageRequest {
3737            queuename: config.robotid.clone(),
3738            replyto: q.clone(),
3739            data,
3740            ..Default::default()
3741        };
3742
3743        let envelope = config.to_envelope();
3744
3745        let result = self.send(envelope).await;
3746        match result {
3747            Ok(m) => {
3748                let data = match m.data {
3749                    Some(d) => d,
3750                    None => {
3751                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
3752                    }
3753                };
3754                if m.command == "error" {
3755                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3756                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3757                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3758                }
3759                // prost::Message::decode(data.value.as_ref())
3760                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3761
3762                let json = rx.await.unwrap();
3763                debug!("Received json result: {:?}", json);
3764                let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3765                let command: String = obj["command"].as_str().unwrap().to_string();
3766                let mut data = "".to_string();
3767                if obj["data"].as_str().is_some() {
3768                    data = obj["data"].as_str().unwrap().to_string();
3769                } else if obj["data"].as_object().is_some() {
3770                    data = obj["data"].to_string();
3771                }
3772                if !command.eq("invokecompleted") {
3773                    if command.eq("timeout") {
3774                        return Err(OpenIAPError::ServerError("Timeout".to_string()));
3775                    } else {
3776                        if data.is_empty() {
3777                            return Err(OpenIAPError::ServerError(
3778                                "Error with no message".to_string(),
3779                            ));
3780                        }
3781                        return Err(OpenIAPError::ServerError(data));
3782                    }
3783                }
3784                let response = self.unregister_queue(&q).await;
3785                match response {
3786                    Ok(_) => {
3787                        debug!("Unregistered Response Queue: {:?}", q);
3788                    }
3789                    Err(e) => {
3790                        error!("Failed to unregister Response Queue: {:?}", e);
3791                    }
3792                }
3793                Ok(data)
3794            }
3795            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3796        }
3797    }
3798}
3799