openiap_client/
lib.rs

1#![warn(missing_docs)]
2//! The `openiap.client` crate provides the [Client] struct and its methods.
3//! For now this is only the GRPC and WebSocket client, later we will add a web rest, named pipe and TCP client as well.
4//! Initialize a new client, by calling the [Client::new_connect] method.
5//! ```
6//! use openiap_client::{OpenIAPError, Client, QueryRequest};
7//! #[tokio::main]
8//! async fn main() -> Result<(), OpenIAPError> {
9//!     let client = Client::new_connect("").await?;
10//!     let q = client.query( QueryRequest::with_projection(
11//!         "entities",
12//!         "{}",
13//!         "{\"name\":1}"
14//!     )).await?;
15//!     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
16//!     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
17//!     for item in items {
18//!         println!("Item: {:?}", item);
19//!     }
20//!     Ok(())
21//! }
22//! ```
23
24pub use openiap_proto::errors::*;
25pub use openiap_proto::openiap::*;
26pub use openiap_proto::*;
27pub use prost_types::Timestamp;
28pub use openiap::flow_service_client::FlowServiceClient;
29use sqids::Sqids;
30use std::fmt::{Display,Formatter};
31use tokio::task::JoinHandle;
32use tokio_tungstenite::WebSocketStream;
33use tracing::{debug, error, info, trace};
34type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
35type Result<T, E = StdError> = ::std::result::Result<T, E>;
36use std::fs::File;
37use std::io::{Read, Write};
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Arc;
40use tokio::sync::Mutex;
41use tonic::transport::Channel;
42
43use tokio::sync::{mpsc, oneshot};
44
45use std::env;
46use std::time::Duration;
47
48#[cfg(feature = "otel")]
49mod otel;
50mod tests;
51mod ws;
52mod grpc;
53mod util;
54pub use crate::util::{set_otel_url, enable_tracing, disable_tracing};
55pub use crate::otel::{set_f64_observable_gauge, set_u64_observable_gauge, set_i64_observable_gauge, disable_observable_gauge};
56
57type QuerySender = oneshot::Sender<Envelope>;
58type StreamSender = mpsc::Sender<Vec<u8>>;
59type Sock = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
60use futures::StreamExt;
61use async_channel::unbounded;
62// const VERSION: &str = "0.0.39";
63
64
65/// The `Client` struct provides the client for the OpenIAP service.
66/// Initialize a new client, by calling the [Client::new_connect] method.
67#[derive(Clone)]
68pub struct Client {
69    /// Ensure we use only call connect once, and then use re-connect instead.
70    connect_called: Arc<std::sync::Mutex<bool>>,
71    /// The tokio runtime.
72    runtime: Arc<std::sync::Mutex<Option<tokio::runtime::Runtime>>>,
73
74    /// Keep track of usage of the client
75    stats: Arc<std::sync::Mutex<ClientStatistics>>,
76
77    task_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
78    /// The inner client object
79    pub client: Arc<std::sync::Mutex<ClientEnum>>,
80    /// The signed in user.
81    user: Arc<std::sync::Mutex<Option<User>>>,
82    /// The inner client.
83    pub inner: Arc<Mutex<ClientInner>>,
84    /// The `Config` struct provides the configuration for the OpenIAP service we are connecting to.
85    pub config: Arc<std::sync::Mutex<Option<Config>>>,
86    /// Should client automatically reconnect, if disconnected?
87    pub auto_reconnect: Arc<std::sync::Mutex<bool>>,
88    /// URL used to connect to server, processed and without credentials
89    pub url: Arc<std::sync::Mutex<String>>,
90    /// Username used to connect to server
91    pub username: Arc<std::sync::Mutex<String>>,
92    /// Password used to connect to server
93    pub password: Arc<std::sync::Mutex<String>>,
94    /// JWT token used to connect to server
95    pub jwt: Arc<std::sync::Mutex<String>>,
96    service_name: Arc<std::sync::Mutex<String>>,
97    agent_name: Arc<std::sync::Mutex<String>>,
98    agent_version: Arc<std::sync::Mutex<String>>,
99    event_sender: async_channel::Sender<ClientEvent>,
100    event_receiver: async_channel::Receiver<ClientEvent>,
101    out_envelope_sender: async_channel::Sender<Envelope>,
102    out_envelope_receiver: async_channel::Receiver<Envelope>,
103    rpc_reply_queue: Arc<tokio::sync::Mutex<Option<String>>>,
104    rpc_callback: Arc<tokio::sync::Mutex<Option<QueueCallbackFn>>>,
105
106    /// The client connection state.
107    pub state: Arc<std::sync::Mutex<ClientState>>,
108    /// Inceasing message count, used as unique id for messages.
109    pub msgcount: Arc<std::sync::Mutex<i32>>,
110    /// Reconnect interval in milliseconds, this will slowly increase if we keep getting disconnected.
111    pub reconnect_ms: Arc<std::sync::Mutex<i32>>,
112    /// The default timeout for requests
113    pub default_timeout: Arc<std::sync::Mutex<tokio::time::Duration>>,
114}
115/// The `ClientStatistics` struct provides the statistics for usage of the client
116#[derive(Clone, Default)]
117pub struct ClientStatistics {
118    connection_attempts: u64,
119    connections: u64,
120    package_tx: u64,
121    package_rx: u64,
122    signin: u64,
123    download: u64,
124    getdocumentversion: u64,
125    customcommand: u64,
126    listcollections: u64,
127    createcollection: u64,
128    dropcollection: u64,
129    ensurecustomer: u64,
130    invokeopenrpa: u64,
131    registerqueue: u64,
132    registerexchange: u64,
133    unregisterqueue: u64,
134    watch: u64,
135    unwatch: u64,
136    queuemessage: u64,
137    pushworkitem: u64,
138    pushworkitems: u64,
139    popworkitem: u64,
140    updateworkitem: u64,
141    deleteworkitem: u64,
142    addworkitemqueue: u64,
143    updateworkitemqueue: u64,
144    deleteworkitemqueue: u64,
145    getindexes: u64,
146    createindex: u64,
147    dropindex: u64,
148    upload: u64,
149    query: u64,
150    count: u64,
151    distinct: u64,
152    aggregate: u64,
153    insertone: u64,
154    insertmany: u64,
155    insertorupdateone: u64,
156    insertorupdatemany: u64,
157    updateone: u64,
158    updatedocument: u64,
159    deleteone: u64,
160    deletemany: u64,
161}
162// type QueueCallbackFn = Box<dyn Fn(&Client, QueueEvent) -> Option<String> + Send + Sync>;
163use futures::future::BoxFuture;
164type QueueCallbackFn =
165    Arc<dyn Fn(Arc<Client>, QueueEvent) -> BoxFuture<'static, Option<String>> + Send + Sync>;
166
167// type ExchangeCallbackFn = Box<dyn Fn(&Client, QueueEvent) + Send + Sync>;
168/// The `ClientInner` struct provides the inner client for the OpenIAP service.
169#[derive(Clone)]
170pub struct ClientInner {
171    /// list of queries ( messages sent to server we are waiting on a response for )
172    pub queries: Arc<Mutex<std::collections::HashMap<String, QuerySender>>>,
173    /// Active streams the server (or client) has opened
174    pub streams: Arc<Mutex<std::collections::HashMap<String, StreamSender>>>,
175    /// List of active watches ( change streams )
176    #[allow(clippy::type_complexity)]
177    pub watches:
178        Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(WatchEvent) + Send + Sync>>>>,
179    /// List of active queues ( message queues / mqqt queues or exchanges )
180    #[allow(clippy::type_complexity)]
181    pub queues:
182        Arc<Mutex<std::collections::HashMap<String, QueueCallbackFn>>>,
183}
184/// Client enum, used to determine which client to use.
185#[derive(Clone, Debug)]
186pub enum ClientEnum {
187    /// Not set yet
188    None,
189    /// Used when client wants to connect using gRPC 
190    Grpc(FlowServiceClient<tonic::transport::Channel>),
191    /// Used when client wants to connect using websockets
192    WS(Arc<Mutex<Sock>>)
193}
194/// Client event enum, used to determine which event has occurred.
195#[derive(Debug, Clone, PartialEq)]
196pub enum ClientEvent {
197    /// The client has connected
198    Connecting,
199    /// The client has connected
200    Connected,
201    /// The client has disconnected
202    Disconnected(String),
203    /// The client has signed in
204    SignedIn,
205    // The client has signed out
206    // SignedOut,
207    // The client has received a message
208    // Message(Envelope),
209    // The client has received a ping event from the server
210    // Ping,
211    // The client has received a stream
212    // Stream(Vec<u8>),
213    // The client has received a watch event
214    // Watch(WatchEvent),
215    // The client has received a queue event
216    // Queue(QueueEvent),
217}
218/// Client event enum, used to determine which event has occurred.
219#[derive(Debug, Clone, PartialEq)]
220pub enum ClientState {
221    /// The client is disconnected
222    Disconnected,
223    /// The client connecting
224    Connecting,
225    /// The client is connected
226    Connected,
227    /// The client is signed in and connected
228    Signedin
229}
230impl Display for ClientState {
231    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232        match self {
233            ClientState::Disconnected => write!(f, "Disconnected"),
234            ClientState::Connecting => write!(f, "Connecting"),
235            ClientState::Connected => write!(f, "Connected"),
236            ClientState::Signedin => write!(f, "Signedin"),
237        }
238    }
239}
240
241/// The `Config` struct provides the configuration for the OpenIAP service we are connecting to.
242#[derive(Debug, Clone, serde::Deserialize)]
243#[allow(dead_code)]
244pub struct Config {
245    #[serde(default)]
246    wshost: String,
247    #[serde(default)]
248    wsurl: String,
249    #[serde(default)]
250    domain: String,
251    #[serde(default)]
252    auto_create_users: bool,
253    #[serde(default)]
254    namespace: String,
255    #[serde(default)]
256    agent_domain_schema: String,
257    #[serde(default)]
258    version: String,
259    #[serde(default)]
260    validate_emails: bool,
261    #[serde(default)]
262    forgot_pass_emails: bool,
263    #[serde(default)]
264    supports_watch: bool,
265    #[serde(default)]
266    amqp_enabled_exchange: bool,
267    #[serde(default)]
268    multi_tenant: bool,
269    #[serde(default)]
270    enable_entity_restriction: bool,
271    #[serde(default)]
272    enable_web_tours: bool,
273    #[serde(default)]
274    collections_with_text_index: Vec<String>,
275    #[serde(default)]
276    timeseries_collections: Vec<String>,
277    #[serde(default)]
278    ping_clients_interval: i32,
279    #[serde(default)]
280    validlicense: bool,
281    #[serde(default)]
282    forceddomains: Vec<String>,
283    #[serde(default)]
284    grafana_url: String,
285    #[serde(default)]
286    otel_metric_url: String,
287    #[serde(default)]
288    otel_trace_url: String,
289    #[serde(default)]
290    otel_log_url: String,
291    #[serde(default)]
292    enable_analytics: bool,
293}
294impl std::fmt::Debug for ClientInner {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        f.debug_struct("ClientInner")
297            // .field("client", &self.client)
298            .field("queries", &self.queries)
299            .field("streams", &self.streams)
300            .finish()
301    }
302}
303impl Default for Client {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309/// The `EnvConfig` struct provides the environment configuration for the OpenIAP service.
310/// to use Custom JWT for only this command, and to provide traceid and spanid for tracing.
311#[derive(Clone, PartialEq)]
312pub struct EnvConfig {
313    /// The JWT token to use for this command.
314    pub jwt: String,
315    /// The traceid to use for this command.
316    pub traceid: String,
317    /// The spanid to use for this command.
318    pub spanid: String,
319}
320impl EnvConfig {
321    /// Create a new EnvConfig.
322    #[tracing::instrument(skip_all)]
323    pub fn new() -> Self {
324        Self {
325            jwt: String::new(),
326            traceid: String::new(),
327            spanid: String::new(),
328        }
329    }
330    /// Create a new EnvConfig with a JWT.
331    #[tracing::instrument(skip_all)]
332    pub fn with_jwt(jwt: &str) -> Self {
333        Self {
334            jwt: jwt.to_string(),
335            traceid: String::new(),
336            spanid: String::new(),
337        }
338    }
339}
340
341impl Client {
342    /// Create a new client.
343    pub fn new() -> Self {
344        let (ces, cer) = unbounded::<ClientEvent>();
345        let (out_es, out_er) = unbounded::<Envelope>();
346        let version = env!("CARGO_PKG_VERSION");
347        Self {
348            task_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
349            stats: Arc::new(std::sync::Mutex::new(ClientStatistics::default())),
350            user: Arc::new(std::sync::Mutex::new(None)),
351            client: Arc::new(std::sync::Mutex::new(ClientEnum::None)),
352            connect_called: Arc::new(std::sync::Mutex::new(false)),
353            runtime: Arc::new(std::sync::Mutex::new(None)),
354            msgcount: Arc::new(std::sync::Mutex::new(-1)),
355            reconnect_ms: Arc::new(std::sync::Mutex::new(1000)),
356            rpc_reply_queue: Arc::new(tokio::sync::Mutex::new(None)),
357            rpc_callback: Arc::new(tokio::sync::Mutex::new(None)),
358            inner: Arc::new(Mutex::new(ClientInner {
359                queries: Arc::new(Mutex::new(std::collections::HashMap::new())),
360                streams: Arc::new(Mutex::new(std::collections::HashMap::new())),
361                watches: Arc::new(Mutex::new(std::collections::HashMap::new())),
362                queues: Arc::new(Mutex::new(std::collections::HashMap::new())),
363            })),
364            config: Arc::new(std::sync::Mutex::new(None)),
365            auto_reconnect: Arc::new(std::sync::Mutex::new(true)),
366            url: Arc::new(std::sync::Mutex::new("".to_string())),
367            username: Arc::new(std::sync::Mutex::new("".to_string())),
368            password: Arc::new(std::sync::Mutex::new("".to_string())),
369            jwt: Arc::new(std::sync::Mutex::new("".to_string())),
370            service_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
371            agent_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
372            agent_version: Arc::new(std::sync::Mutex::new(version.to_string())),
373            event_sender: ces,
374            event_receiver: cer,
375            out_envelope_sender: out_es,
376            out_envelope_receiver: out_er,
377            state: Arc::new(std::sync::Mutex::new(ClientState::Disconnected)),
378            default_timeout: Arc::new(std::sync::Mutex::new(Duration::from_secs(30))),
379        }
380    }
381    /// Connect the client to the OpenIAP server.
382    #[tracing::instrument(skip_all)]
383    pub fn connect(&self, dst: &str) -> Result<(), OpenIAPError> {
384        let rt = match tokio::runtime::Runtime::new() {
385            Ok(rt) => rt,
386            Err(e) => {
387                return Err(OpenIAPError::ClientError(format!(
388                    "Failed to create tokio runtime: {}",
389                    e
390                )));
391            }
392        };
393        self.set_runtime(Some(rt));
394        tokio::task::block_in_place(|| {
395            let handle = self.get_runtime_handle();
396            handle.block_on(self.connect_async(dst))
397        })
398    }
399
400    /// Load the configuration from the server.
401    #[allow(unused_variables)]
402    pub async fn load_config(&self, strurl: &str, url: &url::Url) -> Option<Config> {
403        let config: Option<Config>;
404        let issecure = url.scheme() == "https" || url.scheme() == "wss" || url.port() == Some(443);
405        let mut port = url.port().unwrap_or(80);
406        if issecure {
407            port = 443;
408        }
409        let mut host = url.host_str().unwrap_or("localhost.openiap.io").replace("grpc.", "");
410        if host.starts_with("api-grpc") {
411            host = "api".to_string();
412        }
413        if port == 50051 {
414            port = 3000;
415        }
416        let configurl = if issecure {
417            format!(
418                "{}://{}:{}/config",
419                "https",
420                host,
421                port
422            )
423        } else {
424            format!(
425                "{}://{}:{}/config",
426                "http",
427                host,
428                port
429            )
430        };
431
432        let configurl = url::Url::parse(configurl.as_str())
433            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e))).expect("wefew");
434        trace!("Getting config from: {}", configurl);
435        let o = minreq::get(configurl).with_timeout(5).send();
436        match o {
437            Ok(_) => {
438                let response = match o {
439                    Ok(response) => response,
440                    Err(e) => {
441                        error!("Failed to get config: {}", e);
442                        return None;
443                    }
444                };
445                if response.status_code == 200 {
446                    let body = response.as_str().unwrap();
447                    config = Some(match serde_json::from_str(body) {
448                        Ok(config) => config,
449                        Err(e) => {
450                            error!("Failed to parse config: {}", e);
451                            return None;
452                        }
453                    });
454                } else {
455                    config = None;
456                }
457            }
458            Err(e) => {
459                error!("Failed to get config: {}", e);
460                return None;
461            }
462        }
463        let mut _enable_analytics = true;
464        let mut _otel_metric_url = std::env::var("OTEL_METRIC_URL").unwrap_or_default();
465        let mut _otel_trace_url = std::env::var("OTEL_TRACE_URL").unwrap_or_default();
466        let mut _otel_log_url = std::env::var("OTEL_LOG_URL").unwrap_or_default();
467        let mut apihostname = url.host_str().unwrap_or("localhost.openiap.io").to_string();
468        if apihostname.starts_with("grpc.") {
469            apihostname = apihostname[5..].to_string();
470        }
471    
472        if config.is_some() {
473            let config = config.as_ref().unwrap();
474            if !config.otel_metric_url.is_empty() {
475                _otel_metric_url = config.otel_metric_url.clone();
476            }
477            if !config.otel_trace_url.is_empty() {
478                _otel_trace_url = config.otel_trace_url.clone();
479            }
480            if !config.otel_log_url.is_empty() {
481                _otel_log_url = config.otel_log_url.clone();
482            }
483            if !config.domain.is_empty() {
484                apihostname = config.domain.clone();
485            }
486            _enable_analytics = config.enable_analytics;
487        }
488        #[cfg(feature = "otel")]
489        if _enable_analytics {
490            let service_name = self.get_service_name();
491            let agent_name = self.get_agent_name();
492            let agent_version = self.get_agent_version();
493            let version = env!("CARGO_PKG_VERSION");
494            match otel::init_telemetry(&service_name, &agent_name, &agent_version, &version, &apihostname, _otel_metric_url.as_str(),
495            _otel_trace_url.as_str(),  _otel_log_url.as_str(),
496            &self.stats) {
497                Ok(_) => (),
498                Err(e) => {
499                    error!("Failed to initialize telemetry: {}", e);
500                    return None;
501                }
502            }
503        }
504        config
505    }
506
507    /// Connect the client to the OpenIAP server.
508    #[tracing::instrument(skip_all)]
509    pub async fn connect_async(&self, dst: &str) -> Result<(), OpenIAPError> {
510        #[cfg(test)]
511        {   
512            // enable_tracing("openiap=trace", "new");
513            // enable_tracing("openiap=debug", "new");
514            // enable_tracing("trace", "");
515            enable_tracing("openiap=error", "");
516            // enable_tracing("openiap=debug", "");
517        }
518        if self.is_connect_called() {
519            self.set_auto_reconnect(true);
520            return self.reconnect().await;
521        }
522        let mut strurl = dst.to_string();
523        if strurl.is_empty() {
524            strurl = std::env::var("apiurl").unwrap_or("".to_string());
525            if strurl.is_empty() {
526                strurl = std::env::var("grpcapiurl").unwrap_or("".to_string());
527            }
528            if strurl.is_empty() {
529                strurl = std::env::var("OPENIAP_URL").unwrap_or("".to_string());
530            }
531            if strurl.is_empty() {
532                strurl = std::env::var("OPENIAP_APIURL").unwrap_or("".to_string());
533            }
534            if strurl.is_empty() {
535                strurl = std::env::var("wsapiurl").unwrap_or("".to_string());
536            }
537        }
538        if strurl.is_empty() {
539            return Err(OpenIAPError::ClientError("No URL provided".to_string()));
540        }
541        let url = url::Url::parse(strurl.as_str())
542            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
543        let mut username = "".to_string();
544        let mut password = "".to_string();
545        if let Some(p) = url.password() {
546            password = p.to_string();
547        }
548        if !url.username().is_empty() {
549            username = url.username().to_string();
550        }
551        if !username.is_empty() && !password.is_empty() {
552            self.set_username(&username);
553            self.set_password(&password);
554        }
555        let usegprc = url.scheme() == "grpc" || url.domain().unwrap_or("localhost").to_lowercase().starts_with("grpc.") || url.port() == Some(50051);
556        if url.scheme() != "http"
557            && url.scheme() != "https"
558            && url.scheme() != "grpc"
559            && url.scheme() != "ws"
560            && url.scheme() != "wss"
561        {
562            return Err(OpenIAPError::ClientError("Invalid URL scheme".to_string()));
563        }
564        if url.scheme() == "grpc" {
565            if url.port() == Some(443) {
566                strurl = format!("https://{}", url.host_str().unwrap_or("app.openiap.io"));
567            } else { 
568                strurl = format!("http://{}:{}", url.host_str().unwrap_or("app.openiap.io"), url.port().unwrap_or(80));
569            }
570        }
571        let url = url::Url::parse(strurl.as_str())
572            .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
573        if url.port().is_none() {
574            strurl = format!(
575                "{}://{}",
576                url.scheme(),
577                url.host_str().unwrap_or("app.openiap.io")
578            );
579        } else {
580            strurl = format!(
581                "{}://{}:{}",
582                url.scheme(),
583                url.host_str().unwrap_or("localhost.openiap.io"),
584                url.port().unwrap_or(80)
585            );
586        }
587        debug!("Connecting to {}", strurl);
588        let config = self.load_config(strurl.as_str(), &url).await;
589        if !usegprc {
590            strurl = format!("{}/ws/v2", strurl);
591
592            let (_stream_tx, stream_rx) = mpsc::channel(60);
593
594            let socket = match tokio_tungstenite::connect_async(strurl.clone()).await {
595                Ok((socket, _)) => socket,
596                Err(e) => {
597                    return Err(OpenIAPError::ClientError(format!(
598                        "Failed to connect to WS: {}",
599                        e
600                    )));
601                }
602            };
603            self.set_client(ClientEnum::WS(Arc::new(Mutex::new(socket))));
604            self.set_connect_called(true);
605            self.set_config(config);
606            self.set_url(&strurl);
607            match self.setup_ws(&strurl).await {
608                Ok(_) => (),
609                Err(e) => {
610                    return Err(OpenIAPError::ClientError(format!(
611                        "Failed to setup WS: {}",
612                        e
613                    )));
614                }
615            }
616            let client2 = self.clone();
617            // tokio::task::Builder::new().name("Old Websocket receiver").spawn(async move {
618            tokio::task::spawn(async move {
619                tokio_stream::wrappers::ReceiverStream::new(stream_rx)
620                    .for_each(|envelope: Envelope| async {
621                        let command = envelope.command.clone();
622                        let rid = envelope.rid.clone();
623                        let id = envelope.id.clone();
624                        trace!("Received command: {}, id: {}, rid: {}", command, id, rid);
625                        client2.parse_incomming_envelope(envelope).await;
626                    })
627                    .await;
628            }); // .map_err(|e| OpenIAPError::ClientError(format!("Failed to spawn Old Websocket receiver task: {:?}", e)))?;
629        } else {
630            if url.scheme() == "http" {
631                let response = Client::connect_grpc(strurl.clone()).await;
632                match response {
633                    Ok(client) => {
634                        self.set_client(ClientEnum::Grpc(client));
635                    }
636                    Err(e) => {
637                        return Err(OpenIAPError::ClientError(format!(
638                            "Failed to connect: {}",
639                            e
640                        )));
641                    }
642                }
643            } else {
644                let uri = tonic::transport::Uri::builder()
645                    .scheme(url.scheme())
646                    .authority(url.host().unwrap().to_string())
647                    .path_and_query("/")
648                    .build();
649                let uri = match uri {
650                    Ok(uri) => uri,
651                    Err(e) => {
652                        return Err(OpenIAPError::ClientError(format!(
653                            "Failed to build URI: {}",
654                            e
655                        )));
656                    }
657                };
658                let channel = Channel::builder(uri)
659                    .tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
660                let channel = match channel {
661                    Ok(channel) => channel,
662                    Err(e) => {
663                        return Err(OpenIAPError::ClientError(format!(
664                            "Failed to build channel: {}",
665                            e
666                        )));
667                    }
668                };
669                let channel = channel.connect().await;
670                let channel = match channel {
671                    Ok(channel) => channel,
672                    Err(e) => {
673                        return Err(OpenIAPError::ClientError(format!(
674                            "Failed to connect: {}",
675                            e
676                        )));
677                    }
678                };
679                self.set_client(ClientEnum::Grpc(FlowServiceClient::new(channel)));
680            }
681            self.set_connect_called(true);
682            self.set_config(config);
683            self.set_url(&strurl);
684            self.setup_grpc_stream().await?;
685        };
686        self.post_connected().await
687    }
688
689    /// Connect will initializes a new client and starts a connection to an OpenIAP server.\
690    /// Use "" to autodetect the server from the environment variables (apiurl or grpcapiurl), or provide a URL.\
691    /// \
692    /// You can add username and password, to login using local provider, or set them using OPENIAP_USERNAME and OPENIAP_PASSWORD environment variables.
693    /// It is highly recommended to not user username and password, but instead use a JWT token, set using the OPENIAP_JWT (or jwt) environment variable.
694    /// You can use the openiap vs.code extension to manage this, if you need to generate one your self, login to the OpenIAP server and then open the /jwtlong page.
695    /// If credentials are not provided, the client will run as guest.\
696    /// If credentials are found, it will call [Client::signin] after successfully connecting to the server.
697    /// 
698    /// To troubleshoot issues, call [enable_tracing].
699    /// ```
700    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
701    /// #[tokio::main]
702    /// async fn main() -> Result<(), OpenIAPError> {
703    ///     let client = Client::new_connect("").await?;
704    ///     let q = client.query( QueryRequest::with_projection(
705    ///         "entities",
706    ///         "{}",
707    ///         "{\"name\":1}"
708    ///     )).await?;
709    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
710    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
711    ///     for item in items {
712    ///         println!("Item: {:?}", item);
713    ///     }
714    ///     Ok(())
715    /// }
716    /// ```
717    #[tracing::instrument(skip_all)]
718    pub async fn new_connect(dst: &str) -> Result<Self, OpenIAPError> {
719        #[cfg(test)]
720        {
721            enable_tracing("openiap=error", "");
722        }
723        let client  = Client::new();
724        client.connect_async(dst).await?;
725        Ok(client)
726    }
727    /// Handle auto-signin after a connection has been established.
728    pub async fn post_connected(&self) -> Result<(), OpenIAPError> {
729        if self.get_username().is_empty() && self.get_password().is_empty() {
730            self.set_username(&std::env::var("OPENIAP_USERNAME").unwrap_or_default());
731            self.set_password(&std::env::var("OPENIAP_PASSWORD").unwrap_or_default());
732        }
733        if !self.get_username().is_empty() && !self.get_password().is_empty() {
734            debug!("Signing in with username: {}", self.get_username());
735            let signin = SigninRequest::with_userpass(self.get_username().as_str(), self.get_password().as_str());
736            let loginresponse = self.signin(signin).await;
737            match loginresponse {
738                Ok(response) => {
739                    self.reset_reconnect_ms();
740                    self.set_connected(ClientState::Connected, None);
741                    info!("Signed in as {}", response.user.as_ref().unwrap().username);
742                    Ok(())
743                }
744                Err(_e) => {
745                    self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
746                    Err(_e)
747                }
748            }
749        } else {
750            self.set_jwt(&std::env::var("OPENIAP_JWT").unwrap_or_default());
751            if self.get_jwt().is_empty() {
752                self.set_jwt(&std::env::var("jwt").unwrap_or_default());
753            }
754            if !self.get_jwt().is_empty() {
755                debug!("Signing in with JWT");
756                let signin = SigninRequest::with_jwt(self.get_jwt().as_str());
757                let loginresponse = self.signin(signin).await;
758                match loginresponse {
759                    Ok(response) => match response.user {
760                        Some(user) => {
761                            self.reset_reconnect_ms();
762                            info!("Signed in as {}", user.username);
763                            self.set_connected(ClientState::Connected, None);
764                            Ok(())
765                        }
766                        None => {
767                            self.reset_reconnect_ms();
768                            info!("Signed in as guest");
769                            self.set_connected(ClientState::Connected, None);
770                            Ok(())
771                            // Err(OpenIAPError::ClientError("Signin returned no user object".to_string()))
772                        }
773                    },
774                    Err(_e) => {
775                        self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
776                        Err(_e)
777                    }
778                }
779            } else {
780                self.reset_reconnect_ms();
781                match self.get_element().await {
782                    Ok(_) => {
783                        debug!("Connected, No credentials provided so is running as guest");
784                        self.set_connected(ClientState::Connected, None);
785                        Ok(())
786                    },
787                    Err(e) => {
788                        self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
789                        Err(e)
790                    }
791                }
792            }
793        }
794    }
795    /// Reconnect will attempt to reconnect to the OpenIAP server.
796    #[tracing::instrument(skip_all)]
797    pub async fn reconnect(&self) -> Result<(), OpenIAPError> {
798        let state = self.get_state();
799        if state == ClientState::Connected || state == ClientState::Signedin {
800            return Ok(());
801        }
802        if !self.is_auto_reconnect() {
803            return Ok(());   
804        }
805        let client = self.get_client();
806    
807        match client {
808            ClientEnum::WS(ref _client) => {
809                info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
810                self.setup_ws(&self.get_url()).await?;
811                debug!("Completed reconnecting to websocket");
812                self.post_connected().await
813            }
814            ClientEnum::Grpc(ref _client) => {
815                info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
816                match self.setup_grpc_stream().await {
817                    Ok(_) => {
818                        debug!("Completed reconnecting to gRPC");
819                        self.post_connected().await
820                    },
821                    Err(e) => {
822                        return Err(OpenIAPError::ClientError(format!(
823                            "Failed to setup gRPC stream: {}",
824                            e
825                        )));
826                    }
827                }
828            }
829            ClientEnum::None => {
830                return Err(OpenIAPError::ClientError("Invalid client".to_string()));
831            }
832        }
833    }
834    /// Disconnect the client from the OpenIAP server.
835    pub fn disconnect(&self) {
836        self.set_auto_reconnect(false);
837        self.set_connected(ClientState::Disconnected, Some("Disconnected"));
838    }
839    /// Set the connected flag to true or false
840    pub fn set_connected(&self, state: ClientState, message: Option<&str>) {
841        {
842            let current = self.get_state();
843            trace!("Set connected: {:?} from {:?}", state, current);
844            if state == ClientState::Connected && current == ClientState::Signedin {
845                self.set_state(ClientState::Signedin);
846            } else {
847                self.set_state(state.clone());
848            }
849            if state == ClientState::Disconnected && !current.eq(&state) {
850                let me = self.clone();
851                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
852                    tokio::task::spawn(async move {
853                        let mut reply_queue_guard = me.rpc_reply_queue.lock().await;
854                        let mut callback_guard = me.rpc_callback.lock().await;
855                        *reply_queue_guard = None;
856                        *callback_guard = None;
857                    });
858                }
859            }
860            if state == ClientState::Connecting && !current.eq(&state) {
861                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
862                    self.stats.lock().unwrap().connection_attempts += 1;
863                    let me = self.clone();
864                    tokio::task::spawn(async move {
865                        me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
866                    });
867                }
868
869            }
870            if (state == ClientState::Connected|| state == ClientState::Signedin) && (current == ClientState::Disconnected || current == ClientState::Connecting) { 
871                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
872                    self.stats.lock().unwrap().connections += 1;
873                    let me = self.clone();
874                    tokio::task::spawn(async move {
875                        me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
876                    });
877                }
878            }
879            if state == ClientState::Signedin && current != ClientState::Signedin {
880                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
881                    let me = self.clone();
882                    tokio::task::spawn(async move {
883                        me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
884                    });
885                }
886            }
887            if state == ClientState::Disconnected && !current.eq(&state) {
888                if message.is_some() {
889                    debug!("Disconnected: {}", message.unwrap());
890                } else {
891                    debug!("Disconnected");
892                }
893                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
894                    let me = self.clone();
895                    let message = match message {
896                        Some(message) => message.to_string(),
897                        None => "".to_string(),
898                    };
899                        tokio::task::spawn(async move {
900                            me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
901                        });
902                }
903
904                self.kill_handles();
905                if let Ok(_handle) = tokio::runtime::Handle::try_current() {
906                    let client = self.clone();
907                        tokio::task::spawn(async move {
908                        {
909                            let inner = client.inner.lock().await;
910                            let mut queries = inner.queries.lock().await;
911                            let ids = queries.keys().cloned().collect::<Vec<String>>();
912                            debug!("********************************************** Cleaning up");
913                            for id in ids {
914                                let err = ErrorResponse {
915                                    code: 500,
916                                    message: "Disconnected".to_string(),
917                                    stack: "".to_string(),
918                                };
919                                let envelope = err.to_envelope();
920                                let tx = queries.remove(&id).unwrap();
921                                debug!("kill query: {}", id);
922                                let _ = tx.send(envelope);
923                            }
924                            let mut streams = inner.streams.lock().await;
925                            let ids = streams.keys().cloned().collect::<Vec<String>>();
926                            for id in ids {
927                                let tx = streams.remove(&id).unwrap();
928                                debug!("kill stream: {}", id);
929                                let _ = tx.send(Vec::new()).await;
930                            }
931                            let mut queues = inner.queues.lock().await;
932                            let ids = queues.keys().cloned().collect::<Vec<String>>();
933                            for id in ids {
934                                let _ = queues.remove(&id).unwrap();
935                            }
936                            let mut watches = inner.watches.lock().await;
937                            let ids = watches.keys().cloned().collect::<Vec<String>>();
938                            for id in ids {
939                                let _ = watches.remove(&id).unwrap();
940                            }
941                            debug!("**********************************************************");
942                        }
943                        if client.is_auto_reconnect() {
944                            trace!("Reconnecting in {} seconds", client.get_reconnect_ms() / 1000);
945                            tokio::time::sleep(Duration::from_millis(client.get_reconnect_ms() as u64)).await;
946                            if client.is_auto_reconnect() {
947                                client.inc_reconnect_ms();
948                                // let mut client = client.clone();
949                                trace!("Reconnecting . . .");
950                                client.reconnect().await.unwrap_or_else(|e| {
951                                    error!("Failed to reconnect: {}", e);
952                                    client.set_connected(ClientState::Disconnected, Some(&e.to_string()));
953                                });
954                            } else {
955                                debug!("Not reconnecting");
956                            }
957                        } else {
958                            debug!("Reconnecting disabled, stop now");
959                        }
960                    });
961                }
962    
963            }
964        }
965    }
966    /// Get client state
967    pub fn get_state(&self) -> ClientState {
968        let conn = self.state.lock().unwrap();
969        conn.clone()
970    }
971    /// Set client state
972    pub fn set_state(&self, state: ClientState) {
973        let mut conn = self.state.lock().unwrap();
974        *conn = state;
975    }
976    /// Set the msgcount value
977    pub fn set_msgcount(&self, msgcount: i32) {
978        let mut current = self.msgcount.lock().unwrap();
979        trace!("Set msgcount: {} from {}", msgcount, *current);
980        *current = msgcount;
981    }
982    /// Increment the msgcount value
983    pub fn inc_msgcount(&self) -> i32 {
984        let mut current = self.msgcount.lock().unwrap();
985        *current += 1;
986        *current
987    }
988    /// Return value of reconnect_ms
989    pub fn get_reconnect_ms(&self) -> i32 {
990        let reconnect_ms = self.reconnect_ms.lock().unwrap();
991        *reconnect_ms
992    }
993    /// Increment the reconnect_ms value
994    pub fn reset_reconnect_ms(&self) {
995        let mut current = self.reconnect_ms.lock().unwrap();
996        *current = 500;
997    }
998    /// Increment the reconnect_ms value
999    pub fn inc_reconnect_ms(&self) -> i32 {
1000        let mut current = self.reconnect_ms.lock().unwrap();
1001        if *current < 30000 {
1002            *current += 500;
1003        }
1004        *current
1005    }
1006    
1007    /// Push tokio task handle to the task_handles vector
1008    pub fn push_handle(&self, handle: tokio::task::JoinHandle<()>) {
1009        let mut handles = self.task_handles.lock().unwrap();
1010        handles.push(handle);
1011    }
1012    /// Kill all tokio task handles in the task_handles vector
1013    pub fn kill_handles(&self) {
1014        let mut handles = self.task_handles.lock().unwrap();
1015        for handle in handles.iter() {
1016            // let id = handle.id();
1017            // debug!("Killing handle {}", id);
1018            debug!("Killing handle");
1019            if !handle.is_finished() {
1020                handle.abort();
1021            }
1022        }
1023        handles.clear();
1024        // if let Ok(_handle) = tokio::runtime::Handle::try_current() {
1025        //     let runtime = self.get_runtime();
1026        //     if let Some(rt) = runtime.lock().unwrap().take() {
1027        //         rt.shutdown_background();
1028        //     }
1029        // }
1030    }
1031
1032
1033    /// Return value of the msgcount flag
1034    #[tracing::instrument(skip_all)]
1035    fn get_msgcount(&self) -> i32 {
1036        let msgcount = self.msgcount.lock().unwrap();
1037        *msgcount
1038    }
1039    /// Set the default timeout for the client commands
1040    pub fn set_default_timeout(&self, timeout: Duration) {
1041        let mut current = self.default_timeout.lock().unwrap();
1042        trace!("Set default_timeout: {} from {:?}", timeout.as_secs(), current.as_secs());
1043        *current = timeout;
1044    }
1045    /// Return the default timeout for the client commands
1046    pub fn get_default_timeout(&self) -> Duration {
1047        let current = self.default_timeout.lock().unwrap();
1048        current.clone()
1049    }
1050    /// Set the connect_called flag to true or false
1051    #[tracing::instrument(skip_all)]
1052    pub fn set_connect_called(&self, connect_called: bool) {
1053        let mut current = self.connect_called.lock().unwrap();
1054        trace!("Set connect_called: {} from {}", connect_called, *current);
1055        *current = connect_called;
1056    }
1057    /// Return value of the connect_called flag
1058    #[tracing::instrument(skip_all)]
1059    fn is_connect_called(&self) -> bool {
1060        let connect_called = self.connect_called.lock().unwrap();
1061        *connect_called
1062    }
1063    /// Set the auto_reconnect flag to true or false
1064    #[tracing::instrument(skip_all)]
1065    pub fn set_auto_reconnect(&self, auto_reconnect: bool) {
1066        let mut current = self.auto_reconnect.lock().unwrap();
1067        trace!("Set auto_reconnect: {} from {}", auto_reconnect, *current);
1068        *current = auto_reconnect;
1069    }
1070    /// Return value of the auto_reconnect flag
1071    #[tracing::instrument(skip_all)]
1072    fn is_auto_reconnect(&self) -> bool {
1073        let auto_reconnect = self.auto_reconnect.lock().unwrap();
1074        *auto_reconnect
1075    }
1076    /// Set the url flag to true or false
1077    #[tracing::instrument(skip_all)]
1078    pub fn set_url(&self, url: &str) {
1079        let mut current = self.url.lock().unwrap();
1080        trace!("Set url: {} from {}", url, *current);
1081        *current = url.to_string();
1082    }
1083    /// Return value of the url string
1084    #[tracing::instrument(skip_all)]
1085    fn get_url(&self) -> String {
1086        let url = self.url.lock().unwrap();
1087        url.to_string()
1088    }
1089    /// Set the username flag to true or false
1090    #[tracing::instrument(skip_all)]
1091    pub fn set_username(&self, username: &str) {
1092        let mut current = self.username.lock().unwrap();
1093        trace!("Set username: {} from {}", username, *current);
1094        *current = username.to_string();
1095    }
1096    /// Return value of the username string
1097    #[tracing::instrument(skip_all)]
1098    fn get_username(&self) -> String {
1099        let username = self.username.lock().unwrap();
1100        username.to_string()
1101    }
1102    /// Set the password value
1103    #[tracing::instrument(skip_all)]
1104    pub fn set_password(&self, password: &str) {
1105        let mut current = self.password.lock().unwrap();
1106        trace!("Set password: {} from {}", password, *current);
1107        *current = password.to_string();
1108    }
1109    /// Return value of the password string
1110    #[tracing::instrument(skip_all)]
1111    fn get_password(&self) -> String {
1112        let password = self.password.lock().unwrap();
1113        password.to_string()
1114    }
1115    /// Set the jwt flag to true or false
1116    #[tracing::instrument(skip_all)]
1117    pub fn set_jwt(&self, jwt: &str) {
1118        let mut current = self.jwt.lock().unwrap();
1119        trace!("Set jwt: {} from {}", jwt, *current);
1120        *current = jwt.to_string();
1121    }
1122    /// Return value of the jwt string
1123    #[tracing::instrument(skip_all)]
1124    fn get_jwt(&self) -> String {
1125        let jwt = self.jwt.lock().unwrap();
1126        jwt.to_string()
1127    }
1128    
1129    /// Set the service name
1130    #[tracing::instrument(skip_all)]
1131    pub fn set_service_name(&self, service_name: &str) {
1132        let mut current = self.service_name.lock().unwrap();
1133        trace!("Set servicename: {} from {}", service_name, *current);
1134        *current = service_name.to_string();
1135    }
1136    /// Return value of the service name string
1137    #[tracing::instrument(skip_all)]
1138    pub fn get_service_name(&self) -> String {
1139        let servicename = self.service_name.lock().unwrap();
1140        servicename.to_string()
1141    }
1142    /// Set the agent name
1143    #[tracing::instrument(skip_all)]
1144    pub fn set_agent_name(&self, agent: &str) {
1145        let mut current = self.agent_name.lock().unwrap();
1146        trace!("Set agent: {} from {}", agent, *current);
1147        *current = agent.to_string();
1148    }
1149    /// Return value of the agent string
1150    #[tracing::instrument(skip_all)]
1151    pub fn get_agent_name(&self) -> String {
1152        let agent = self.agent_name.lock().unwrap();
1153        agent.to_string()
1154    }
1155    /// Set the agent version number
1156    #[tracing::instrument(skip_all)]
1157    pub fn set_agent_version(&self, version: &str) {
1158        let mut current = self.agent_version.lock().unwrap();
1159        trace!("Set agent version: {} from {}", version, *current);
1160        *current = version.to_string();
1161    }
1162    /// Return value of the agent version string
1163    #[tracing::instrument(skip_all)]
1164    pub fn get_agent_version(&self) -> String {
1165        let agent_version = self.agent_version.lock().unwrap();
1166        agent_version.to_string()
1167    }
1168    
1169    /// Set the config flag to true or false
1170    #[tracing::instrument(skip_all)]
1171    pub fn set_config(&self, config: Option<Config>) {
1172        let mut current = self.config.lock().unwrap();
1173        *current = config;
1174    }
1175    /// Return value of the config 
1176    #[tracing::instrument(skip_all)]
1177    pub fn get_config(&self) -> Option<Config> {
1178        let config = self.config.lock().unwrap();
1179        config.clone()
1180    }
1181    /// Set the client flag to true or false
1182    #[tracing::instrument(skip_all)]
1183    pub fn set_client(&self, client: ClientEnum) {
1184        let mut current = self.client.lock().unwrap();
1185        *current = client;
1186    }
1187    /// Return value of the client
1188    #[tracing::instrument(skip_all)]
1189    fn get_client(&self) -> ClientEnum {
1190        let client = self.client.lock().unwrap();
1191        client.clone()
1192    }
1193    /// Set the user flag to true or false
1194    #[tracing::instrument(skip_all)]
1195    pub fn set_user(&self, user: Option<User>) {
1196        let mut current = self.user.lock().unwrap();
1197        *current = user;
1198    }
1199    /// Return value of the user
1200    #[tracing::instrument(skip_all)]
1201    pub fn get_user(&self) -> Option<User> {
1202        let user = self.user.lock().unwrap();
1203        user.clone()
1204    }
1205    // /// Return the signed in user, if we are signed in.
1206    // #[tracing::instrument(skip_all)]
1207    // pub fn get_user(&self) -> Option<User> {
1208    //     // let inner = self.inner.lock().await;
1209    //     // inner.user.clone()
1210    //     self.user.clone()
1211    // }
1212    
1213    /// Set the runtime flag to true or false
1214    #[tracing::instrument(skip_all)]
1215    pub fn set_runtime(&self, runtime: Option<tokio::runtime::Runtime>) {
1216        let mut current = self.runtime.lock().unwrap();
1217        *current = runtime;
1218    }
1219    /// Return value of the runtime
1220    #[tracing::instrument(skip_all)]
1221    // pub fn get_runtime(&self) -> Option<Arc<tokio::runtime::Runtime>> {
1222    pub fn get_runtime(&self) -> &std::sync::Mutex<std::option::Option<tokio::runtime::Runtime>> {
1223        self.runtime.as_ref()
1224    }
1225    /// Return value of the runtime handle
1226    #[tracing::instrument(skip_all)]
1227    pub fn get_runtime_handle(&self) -> tokio::runtime::Handle {
1228        let mut rt = self.runtime.lock().unwrap();
1229        if rt.is_none() {
1230            // println!("Rust: Initializing new Tokio runtime");
1231            let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
1232            *rt = Some(runtime);
1233        } else {
1234            // println!("Rust: Runtime already initialized");
1235        }
1236        rt.as_ref().unwrap().handle().clone()
1237    }
1238    /// Method to allow the user to subscribe with a callback function
1239    #[tracing::instrument(skip_all)]
1240    pub async fn on_event(&self, callback: Box<dyn Fn(ClientEvent) + Send + Sync>)
1241    {
1242        // call the callback function every time there is an event in the client.event_receiver
1243        let event_receiver = self.event_receiver.clone();
1244        let callback = callback;
1245        let _handle =  tokio::task::spawn(async move {
1246            while let Ok(event) = event_receiver.recv().await {
1247                callback(event);
1248            }
1249        }); // .unwrap();
1250    }
1251    /// Internal function, used to generate a unique id for each message sent to the server.
1252    #[tracing::instrument(skip_all)]
1253    pub fn get_uniqueid() -> String {
1254        static COUNTER: AtomicUsize = AtomicUsize::new(1);
1255        let num1 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1256        let num2 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1257        let num3 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1258        let sqids = Sqids::default();
1259        sqids.encode(&[num1, num2, num3 ]).unwrap().to_string()
1260    }
1261    /// Internal function, Send a message to the OpenIAP server, and wait for a response.
1262    #[tracing::instrument(skip_all)]
1263    async fn send(&self, msg: Envelope, timeout: Option<tokio::time::Duration>) -> Result<Envelope, OpenIAPError> {
1264        let response = self.send_noawait(msg).await;
1265        match response {
1266            Ok((response_rx, id)) => {
1267                let timeout = match timeout {
1268                    Some(t) => t,
1269                    None => self.get_default_timeout()
1270                };
1271                let result = tokio::time::timeout(timeout, response_rx).await;
1272                // Remove the entry from `inner.queries` after awaiting
1273                let inner = self.inner.lock().await;
1274                inner.queries.lock().await.remove(&id);
1275
1276                match result {
1277                    Ok(Ok(response)) => Ok(response),
1278                    Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
1279                    Err(_) => Err(OpenIAPError::ClientError("Request timed out".to_string())),
1280                }
1281                // // Await the response
1282                // let response = response_rx.await;
1283    
1284                // // Remove the entry from `inner.queries` after awaiting
1285                // let inner = self.inner.lock().await;
1286                // inner.queries.lock().await.remove(&id);
1287    
1288                // // Handle the result of the await
1289                // match response {
1290                //     Ok(response) => Ok(response),
1291                //     Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1292                // }
1293            }
1294            Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1295        }
1296    }
1297    /// Internal function, Send a message to the OpenIAP server, and do not wait for a response.
1298    /// used when sending a stream of data, or when we do not need a response.
1299    #[tracing::instrument(skip_all)]
1300    async fn send_noawait(
1301        &self,
1302        mut msg: Envelope,
1303    ) -> Result<(oneshot::Receiver<Envelope>, String), OpenIAPError> {
1304        let (response_tx, response_rx) = oneshot::channel();
1305        let id = Client::get_uniqueid();
1306        msg.id = id.clone();
1307    
1308        // Lock and insert the sender into `inner.queries`
1309        {
1310            let inner = self.inner.lock().await;
1311            inner.queries.lock().await.insert(id.clone(), response_tx);
1312        }
1313    
1314        // Send the message and check for errors
1315        let res = self.send_envelope(msg).await;
1316        if let Err(e) = res {
1317            // Remove the entry from `inner.queries` if the send fails
1318            let inner = self.inner.lock().await;
1319            inner.queries.lock().await.remove(&id);
1320            return Err(OpenIAPError::ClientError(e.to_string()));
1321        }
1322    
1323        Ok((response_rx, id))
1324    }
1325    /// Internal function, Setup a new stream, send a message to the OpenIAP server, and return a stream to send and receive data.
1326    #[tracing::instrument(skip_all)]
1327    async fn sendwithstream(
1328        &self,
1329        mut msg: Envelope,
1330    ) -> Result<(oneshot::Receiver<Envelope>, mpsc::Receiver<Vec<u8>>), OpenIAPError> {
1331        let (response_tx, response_rx) = oneshot::channel();
1332        let (stream_tx, stream_rx) = mpsc::channel(1024 * 1024);
1333        let id = Client::get_uniqueid();
1334        msg.id = id.clone();
1335        {
1336            let inner = self.inner.lock().await;
1337            inner.queries.lock().await.insert(id.clone(), response_tx);
1338            inner.streams.lock().await.insert(id.clone(), stream_tx);
1339            let res = self.send_envelope(msg).await;
1340            match res {
1341                Ok(_) => (),
1342                Err(e) => return Err(OpenIAPError::ClientError(e.to_string())),
1343            }
1344        }
1345        Ok((response_rx, stream_rx))
1346    }
1347    #[tracing::instrument(skip_all, target = "openiap::client")]
1348    async fn send_envelope(&self, mut envelope: Envelope) -> Result<(), OpenIAPError> {
1349        if (self.get_state() != ClientState::Connected && self.get_state() != ClientState::Signedin ) 
1350            && envelope.command != "signin" && envelope.command != "getelement" && envelope.command != "pong" {
1351            return Err(OpenIAPError::ClientError(format!("Not connected ( {:?} )", self.get_state())));
1352        }
1353        let env = envelope.clone();
1354        let command = envelope.command.clone();
1355        self.stats.lock().unwrap().package_tx += 1;
1356        match command.as_str() {
1357            "signin" => { self.stats.lock().unwrap().signin += 1;},
1358            "upload" => { self.stats.lock().unwrap().upload += 1;},
1359            "download" => { self.stats.lock().unwrap().download += 1;},
1360            "getdocumentversion" => { self.stats.lock().unwrap().getdocumentversion += 1;},
1361            "customcommand" => { self.stats.lock().unwrap().customcommand += 1;},
1362            "listcollections" => { self.stats.lock().unwrap().listcollections += 1;},
1363            "createcollection" => { self.stats.lock().unwrap().createcollection += 1;},
1364            "dropcollection" => { self.stats.lock().unwrap().dropcollection += 1;},
1365            "ensurecustomer" => { self.stats.lock().unwrap().ensurecustomer += 1;},
1366            "invokeopenrpa" => { self.stats.lock().unwrap().invokeopenrpa += 1;},
1367
1368            "registerqueue" => { self.stats.lock().unwrap().registerqueue += 1;},
1369            "registerexchange" => { self.stats.lock().unwrap().registerexchange += 1;},
1370            "unregisterqueue" => { self.stats.lock().unwrap().unregisterqueue += 1;},
1371            "watch" => { self.stats.lock().unwrap().watch += 1;},
1372            "unwatch" => { self.stats.lock().unwrap().unwatch += 1;},
1373            "queuemessage" => { self.stats.lock().unwrap().queuemessage += 1;},
1374
1375            "pushworkitem" => { self.stats.lock().unwrap().pushworkitem += 1;},
1376            "pushworkitems" => { self.stats.lock().unwrap().pushworkitems += 1;},
1377            "popworkitem" => { self.stats.lock().unwrap().popworkitem += 1;},
1378            "updateworkitem" => { self.stats.lock().unwrap().updateworkitem += 1;},
1379            "deleteworkitem" => { self.stats.lock().unwrap().deleteworkitem += 1;},
1380            "addworkitemqueue" => { self.stats.lock().unwrap().addworkitemqueue += 1;},
1381            "updateworkitemqueue" => { self.stats.lock().unwrap().updateworkitemqueue += 1;},
1382            "deleteworkitemqueue" => { self.stats.lock().unwrap().deleteworkitemqueue += 1;},
1383
1384            "getindexes" => { self.stats.lock().unwrap().getindexes += 1;},
1385            "createindex" => { self.stats.lock().unwrap().createindex += 1;},
1386            "dropindex" => { self.stats.lock().unwrap().dropindex += 1;},
1387            "query" => { self.stats.lock().unwrap().query += 1;},
1388            "count" => { self.stats.lock().unwrap().count += 1;},
1389            "distinct" => { self.stats.lock().unwrap().distinct += 1;},
1390            "aggregate" => { self.stats.lock().unwrap().aggregate += 1;},
1391            "insertone" => { self.stats.lock().unwrap().insertone += 1;},
1392            "insertmany" => { self.stats.lock().unwrap().insertmany += 1;},
1393            "updateone" => { self.stats.lock().unwrap().updateone += 1;},
1394            "insertorupdateone" => { self.stats.lock().unwrap().insertorupdateone += 1;},
1395            "insertorupdatemany" => { self.stats.lock().unwrap().insertorupdatemany += 1;},
1396            "updatedocument" => { self.stats.lock().unwrap().updatedocument += 1;},
1397            "deleteone" => { self.stats.lock().unwrap().deleteone += 1;},
1398            "deletemany" => { self.stats.lock().unwrap().deletemany += 1;},
1399            _ => {}
1400        };
1401        if envelope.id.is_empty() {
1402            let id = Client::get_uniqueid();
1403            envelope.id = id.clone();
1404        }
1405        trace!("Sending {} message, in the thread", command);
1406        let res = self.out_envelope_sender.send(env).await;
1407        if res.is_err() {
1408            error!("{:?}", res);
1409            let errmsg = res.unwrap_err().to_string();
1410            self.set_connected(ClientState::Disconnected, Some(&errmsg));
1411            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", errmsg)))
1412        } else {
1413            return Ok(())
1414        }
1415    }
1416    #[tracing::instrument(skip_all, target = "openiap::client")]
1417    async fn parse_incomming_envelope(&self, received: Envelope) {
1418        self.stats.lock().unwrap().package_rx += 1;
1419        let command = received.command.clone();
1420        trace!("parse_incomming_envelope, command: {}", command);
1421        let inner = self.inner.lock().await;
1422        let rid = received.rid.clone();
1423        let mut queries = inner.queries.lock().await;
1424        let mut streams = inner.streams.lock().await;
1425        let watches = inner.watches.lock().await;
1426        let queues = inner.queues.lock().await;
1427    
1428        if command != "ping" && command != "pong" && command != "refreshtoken" {
1429            if rid.is_empty() {
1430                debug!("Received #{} #{} {} message", received.seq, received.id, command);
1431            } else {
1432                debug!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1433            }
1434        } else if rid.is_empty() {
1435            trace!("Received #{} #{} {} message", received.seq, received.id, command);
1436        } else {
1437            trace!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1438        }
1439        
1440        if command == "ping" {
1441            self.pong(&received.id).await;
1442            // self.event_sender.send(crate::ClientEvent::Ping).await.unwrap();
1443        } else if command == "refreshtoken" {
1444            // TODO: Do we store the new jwt at some point in the future
1445        } else if command == "beginstream"
1446            || command == "stream"
1447            || command == "endstream"
1448        {
1449            let streamresponse: Stream =
1450                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1451            let streamdata = streamresponse.data;
1452            if !streamdata.is_empty() {
1453                let stream = streams.get(rid.as_str()).unwrap();
1454    
1455                match stream.send(streamdata).await {
1456                    Ok(_) => _ = (),
1457                    Err(e) => error!("Failed to send data: {}", e),
1458                }
1459            }
1460            if command == "endstream" {
1461                let _ = streams.remove(rid.as_str());
1462            }
1463        } else if command == "watchevent" {
1464            let watchevent: WatchEvent =
1465                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1466            if let Some(callback) = watches.get(watchevent.id.as_str()) {
1467                callback(watchevent);
1468            }
1469        } else if command == "queueevent" {
1470            let queueevent: QueueEvent =
1471                prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1472            if let Some(callback) = queues.get(queueevent.queuename.as_str()).cloned() {
1473                let queuename = queueevent.replyto.clone();
1474                let correlation_id = queueevent.correlation_id.clone();
1475                let me = self.clone();
1476                tokio::spawn(async move {
1477                    let result_fut = callback(Arc::new(me.clone()), queueevent);
1478                    let result = result_fut.await;
1479                    if result.is_some() && !queuename.is_empty() {
1480                        debug!("Sending return value from queue event callback to {}", queuename);
1481                        let result = result.unwrap();
1482                        let q = QueueMessageRequest {
1483                            queuename,
1484                            correlation_id,
1485                            data: result,
1486                            striptoken: true,
1487                            ..Default::default()
1488                        };
1489                        let e = q.to_envelope();
1490                        let send_result = me.send(e, None).await;
1491                        if let Err(e) = send_result {
1492                            error!("Failed to send queue event response: {}", e);
1493                        }
1494                    }
1495                });
1496            }
1497        } else if let Some(response_tx) = queries.remove(&rid) {
1498            let stream = streams.get(rid.as_str());
1499            if let Some(stream) = stream {
1500                let streamdata = vec![];
1501                match stream.send(streamdata).await {
1502                    Ok(_) => _ = (),
1503                    Err(e) => error!("Failed to send data: {}", e),
1504                }
1505            }
1506            let _ = response_tx.send(received);
1507        } else {
1508            error!("Received unhandled {} message: {:?}", command, received);
1509        }    
1510    }    
1511    /// Internal function, used to send a fake getelement to the OpenIAP server.
1512    #[tracing::instrument(skip_all)]
1513    async fn get_element(&self) -> Result<(), OpenIAPError> {
1514        let id = Client::get_uniqueid();
1515        let envelope = Envelope {
1516            id: id.clone(),
1517            command: "getelement".into(),
1518            ..Default::default()
1519        };
1520        let result = match self.send(envelope, None).await {
1521            Ok(res) => res,
1522            Err(e) => {
1523                return Err(e);
1524            },
1525        };
1526        if result.command == "pong" || result.command == "getelement" {
1527            Ok(())
1528        } else if result.command == "error" {
1529            let e: ErrorResponse = prost::Message::decode(result.data.unwrap().value.as_ref()).unwrap();
1530            Err(OpenIAPError::ServerError(e.message))
1531        } else {
1532            Err(OpenIAPError::ClientError("Failed to receive getelement".to_string()))
1533        }
1534    }
1535    /// Internal function, used to send a ping to the OpenIAP server.
1536    #[tracing::instrument(skip_all)]
1537    async fn ping(&self) -> Result<(), OpenIAPError> {
1538        let id = Client::get_uniqueid();
1539        let envelope = Envelope {
1540            id: id.clone(),
1541            command: "getelement".into(),
1542            ..Default::default()
1543        };
1544        match self.send_envelope(envelope).await {
1545            Ok(_res) => Ok(()),
1546            Err(e) => {
1547                return Err(e);
1548            },
1549        }
1550    }
1551    /// Internal function, used to send a pong response to the OpenIAP server.
1552    #[tracing::instrument(skip_all)]
1553    async fn pong(&self, rid: &str) {
1554        let id = Client::get_uniqueid();
1555        let envelope = Envelope {
1556            id: id.clone(),
1557            command: "pong".into(),
1558            rid: rid.to_string(),
1559            ..Default::default()
1560        };
1561        match self.send_envelope(envelope).await {
1562            Ok(_) => (),
1563            Err(e) => error!("Failed to send pong: {}", e),
1564        }
1565    }
1566    /// Sign in to the OpenIAP service. \
1567    /// If no username and password is provided, it will attempt to use environment variables.\
1568    /// if config is set to validateonly, it will only validate the credentials, but not sign in.\
1569    /// If no jwt, username and password is provided, it will attempt to use environment variables.\
1570    /// will prefere OPENIAP_JWT (or jwt) over OPENIAP_USERNAME and OPENIAP_PASSWORD.
1571    #[tracing::instrument(skip_all)]
1572    pub async fn signin(&self, mut config: SigninRequest) -> Result<SigninResponse, OpenIAPError> {
1573        // autodetect how to signin using environment variables
1574        if config.username.is_empty() && config.password.is_empty() && config.jwt.is_empty() {
1575            if config.jwt.is_empty() {
1576                config.jwt = std::env::var("OPENIAP_JWT").unwrap_or_default();
1577            }
1578            if config.jwt.is_empty() {
1579                config.jwt = std::env::var("jwt").unwrap_or_default();
1580            }
1581            // if no jwt was found, test for username and password
1582            if config.jwt.is_empty() {
1583                if config.username.is_empty() {
1584                    config.username = std::env::var("OPENIAP_USERNAME").unwrap_or_default();
1585                }
1586                if config.password.is_empty() {
1587                    config.password = std::env::var("OPENIAP_PASSWORD").unwrap_or_default();
1588                }
1589            }
1590        }
1591        let version = env!("CARGO_PKG_VERSION");
1592        if !version.is_empty() && config.version.is_empty() {
1593            config.version = version.to_string();
1594        }
1595        if config.agent.is_empty() {
1596            config.agent = self.get_agent_name();
1597        }
1598
1599        // trace!("Attempting sign-in using {:?}", config);
1600        let envelope = config.to_envelope();
1601        let result = self.send(envelope, None).await;
1602
1603        match &result {
1604            Ok(m) => {
1605                debug!("Sign-in reply received");
1606                if m.command == "error" {
1607                    let e: ErrorResponse =
1608                        prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1609                            .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1610                    return Err(OpenIAPError::ServerError(e.message));
1611                }
1612                debug!("Sign-in successful");
1613                let response: SigninResponse =
1614                    prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1615                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1616                if !config.validateonly {
1617                    self.set_connected(ClientState::Signedin, None);
1618                    self.set_user(Some(response.user.as_ref().unwrap().clone()));
1619                }
1620                Ok(response)
1621            }
1622            Err(e) => {
1623                debug!("Sending Sign-in request failed {:?}", result);
1624                debug!("Sign-in failed: {}", e.to_string());
1625                if !config.validateonly {
1626                    self.set_user(None);
1627                }
1628                Err(OpenIAPError::ClientError(e.to_string()))
1629            }
1630        }
1631    }
1632    /// Return a list of collections in the database
1633    /// - includehist: include historical collections, default is false.
1634    /// please see create_collection for examples on how to create collections.
1635    #[tracing::instrument(skip_all)]
1636    pub async fn list_collections(&self, includehist: bool, env: EnvConfig) -> Result<String, OpenIAPError> {
1637        let config = ListCollectionsRequest::new(includehist);
1638        let mut envelope = config.to_envelope();
1639        if !env.jwt.is_empty() {
1640            envelope.jwt = env.jwt;
1641        }
1642        if !env.spanid.is_empty() {
1643            envelope.spanid = env.spanid;
1644        }
1645        if !env.traceid.is_empty() {
1646            envelope.traceid = env.traceid;
1647        }
1648        let result = self.send(envelope, None).await;
1649        match result {
1650            Ok(m) => {
1651                let data = match m.data {
1652                    Some(data) => data,
1653                    None => {
1654                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1655                    }
1656                };
1657                if m.command == "error" {
1658                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1659                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1660                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1661                }
1662                let response: ListCollectionsResponse = prost::Message::decode(data.value.as_ref())
1663                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1664                Ok(response.results)
1665            }
1666            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1667        }
1668    }
1669    /// Create a new collection in the database.
1670    /// You can create a collection by simply adding a new document to it using [Client::insert_one].
1671    /// Or you can create a collecting using the following example:
1672    /// ```
1673    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1674    /// #[tokio::main]
1675    /// async fn main() -> Result<(), OpenIAPError> {
1676    ///     let client = Client::new_connect("").await?;
1677    ///     //let collections = client.list_collections(false).await?;
1678    ///     //println!("Collections: {}", collections);
1679    ///     let config = CreateCollectionRequest::byname("rusttestcollection");
1680    ///     client.create_collection(config).await?;
1681    ///     let config = DropCollectionRequest::byname("rusttestcollection");
1682    ///     client.drop_collection(config).await?;
1683    ///     Ok(())
1684    /// }
1685    /// ```
1686    /// You can create a normal collection with a TTL index on the _created field, using the following example:
1687    /// ```
1688    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1689    /// #[tokio::main]
1690    /// async fn main() -> Result<(), OpenIAPError> {
1691    ///     let client = Client::new_connect("").await?;
1692    ///     let config = CreateCollectionRequest::with_ttl(
1693    ///         "rusttestttlcollection",
1694    ///         60
1695    ///     );
1696    ///     client.create_collection(config).await?;
1697    ///     let config = DropCollectionRequest::byname("rusttestttlcollection");
1698    ///     client.drop_collection(config).await?;
1699    ///     Ok(())
1700    /// }
1701    /// ```
1702    /// You can create a time series collection using the following example:
1703    /// granularity can be one of: seconds, minutes, hours
1704    /// ```
1705    /// use openiap_client::{Client, CreateCollectionRequest, DropCollectionRequest, OpenIAPError};
1706    /// #[tokio::main]
1707    /// async fn main() -> Result<(), OpenIAPError> {
1708    ///     let client = Client::new_connect("").await?;
1709    ///     let config = CreateCollectionRequest::timeseries(
1710    ///         "rusttesttscollection2",
1711    ///         "_created",
1712    ///         "minutes"
1713    ///     );
1714    ///     client.create_collection(config).await?;
1715    ///     let config = DropCollectionRequest::byname("rusttesttscollection2");
1716    ///     client.drop_collection(config).await?;
1717    ///     Ok(())
1718    /// }
1719    /// ```
1720    #[tracing::instrument(skip_all)]
1721    pub async fn create_collection(
1722        &self,
1723        config: CreateCollectionRequest,
1724        env: EnvConfig,
1725    ) -> Result<(), OpenIAPError> {
1726        if config.collectionname.is_empty() {
1727            return Err(OpenIAPError::ClientError(
1728                "No collection name provided".to_string(),
1729            ));
1730        }
1731        let mut envelope = config.to_envelope();
1732        if !env.jwt.is_empty() {
1733            envelope.jwt = env.jwt;
1734        }
1735        if !env.spanid.is_empty() {
1736            envelope.spanid = env.spanid;
1737        }
1738        if !env.traceid.is_empty() {
1739            envelope.traceid = env.traceid;
1740        }
1741        let result = self.send(envelope, None).await;
1742        match result {
1743            Ok(m) => {
1744                let data = match m.data {
1745                    Some(data) => data,
1746                    None => {
1747                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1748                    }
1749                };
1750                if m.command == "error" {
1751                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1752                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1753                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1754                }
1755                Ok(())
1756            }
1757            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1758        }
1759    }
1760    /// Drop a collection from the database, this will delete all data and indexes for the collection.
1761    /// See [Client::create_collection] for examples on how to create a collection.
1762    #[tracing::instrument(skip_all)]
1763    pub async fn drop_collection(&self, config: DropCollectionRequest, env: EnvConfig) -> Result<(), OpenIAPError> {
1764        if config.collectionname.is_empty() {
1765            return Err(OpenIAPError::ClientError(
1766                "No collection name provided".to_string(),
1767            ));
1768        }
1769        let mut envelope = config.to_envelope();
1770        if !env.jwt.is_empty() {
1771            envelope.jwt = env.jwt;
1772        }
1773        if !env.spanid.is_empty() {
1774            envelope.spanid = env.spanid;
1775        }
1776        if !env.traceid.is_empty() {
1777            envelope.traceid = env.traceid;
1778        }
1779        let result = self.send(envelope, None).await;
1780        match result {
1781            Ok(m) => {
1782                let data = match m.data {
1783                    Some(data) => data,
1784                    None => {
1785                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1786                    }
1787                };
1788                if m.command == "error" {
1789                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1790                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1791                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1792                }
1793                Ok(())
1794            }
1795            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1796        }
1797    }
1798    /// Return all indexes for a collection in the database
1799    /// ```
1800    /// use openiap_client::{Client, GetIndexesRequest, OpenIAPError};
1801    /// #[tokio::main]
1802    /// async fn main() -> Result<(), OpenIAPError> {
1803    ///     let client = Client::new_connect("").await?;
1804    ///     let config = GetIndexesRequest::bycollectionname("rustindextestcollection");
1805    ///     let indexes = client.get_indexes(config).await?;
1806    ///     println!("Indexes: {}", indexes);
1807    ///     Ok(())
1808    /// }
1809    /// ```
1810    ///
1811    pub async fn get_indexes(&self, config: GetIndexesRequest, env: EnvConfig) -> Result<String, OpenIAPError> {
1812        if config.collectionname.is_empty() {
1813            return Err(OpenIAPError::ClientError(
1814                "No collection name provided".to_string(),
1815            ));
1816        }
1817        let mut envelope = config.to_envelope();
1818        if !env.jwt.is_empty() {
1819            envelope.jwt = env.jwt;
1820        }
1821        if !env.spanid.is_empty() {
1822            envelope.spanid = env.spanid;
1823        }
1824        if !env.traceid.is_empty() {
1825            envelope.traceid = env.traceid;
1826        }
1827        let result = self.send(envelope, None).await;
1828        match result {
1829            Ok(m) => {
1830                let data = match m.data {
1831                    Some(data) => data,
1832                    None => {
1833                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1834                    }
1835                };
1836                if m.command == "error" {
1837                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1838                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1839                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1840                }
1841                let response: GetIndexesResponse = prost::Message::decode(data.value.as_ref())
1842                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1843                Ok(response.results)
1844            }
1845            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1846        }
1847    }
1848    /// Create an index in the database.
1849    /// Example of creating an index on the name field in the rustindextestcollection collection, and then dropping it again:
1850    /// ```
1851    /// use openiap_client::{Client, DropIndexRequest, CreateIndexRequest, OpenIAPError};
1852    /// #[tokio::main]
1853    /// async fn main() -> Result<(), OpenIAPError> {
1854    ///     let client = Client::new_connect("").await?;
1855    ///     let config = CreateIndexRequest::bycollectionname(
1856    ///         "rustindextestcollection",
1857    ///         "{\"name\": 1}"
1858    ///     );
1859    ///     client.create_index(config).await?;
1860    ///     let config = DropIndexRequest::bycollectionname(
1861    ///         "rustindextestcollection",
1862    ///         "name_1"
1863    ///     );
1864    ///     client.drop_index(config).await?;
1865    ///     Ok(())
1866    /// }
1867    /// ```
1868    /// Example of creating an unique index on the address field in the rustindextestcollection collection, and then dropping it again:
1869    /// ```
1870    /// use openiap_client::{Client, DropIndexRequest, CreateIndexRequest, OpenIAPError};
1871    /// #[tokio::main]
1872    /// async fn main() -> Result<(), OpenIAPError> {
1873    ///     let client = Client::new_connect("").await?;
1874    ///     let mut config = CreateIndexRequest::bycollectionname(
1875    ///         "rustindextestcollection",
1876    ///         "{\"address\": 1}"
1877    ///     );
1878    ///     config.options = "{\"unique\": true}".to_string();
1879    ///     client.create_index(config).await?;
1880    ///     let config = DropIndexRequest::bycollectionname(
1881    ///         "rustindextestcollection",
1882    ///         "address_1"
1883    ///     );
1884    ///     client.drop_index(config).await?;
1885    ///     Ok(())
1886    /// }
1887    /// ```
1888    pub async fn create_index(&self, config: CreateIndexRequest, env: EnvConfig) -> Result<(), OpenIAPError> {
1889        if config.collectionname.is_empty() {
1890            return Err(OpenIAPError::ClientError(
1891                "No collection name provided".to_string(),
1892            ));
1893        }
1894        if config.index.is_empty() {
1895            return Err(OpenIAPError::ClientError(
1896                "No index was provided".to_string(),
1897            ));
1898        }
1899        let mut envelope = config.to_envelope();
1900        if !env.jwt.is_empty() {
1901            envelope.jwt = env.jwt;
1902        }
1903        if !env.spanid.is_empty() {
1904            envelope.spanid = env.spanid;
1905        }
1906        if !env.traceid.is_empty() {
1907            envelope.traceid = env.traceid;
1908        }
1909        let result = self.send(envelope, None).await;
1910        match result {
1911            Ok(m) => {
1912                let data = match m.data {
1913                    Some(data) => data,
1914                    None => {
1915                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1916                    }
1917                };
1918                if m.command == "error" {
1919                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1920                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1921                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1922                }
1923                Ok(())
1924            }
1925            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1926        }
1927    }
1928    /// Drop an index from the database
1929    /// See [Client::create_index] for an example on how to create and drop an index.
1930    pub async fn drop_index(&self, config: DropIndexRequest, env: EnvConfig) -> Result<(), OpenIAPError> {
1931        if config.collectionname.is_empty() {
1932            return Err(OpenIAPError::ClientError(
1933                "No collection name provided".to_string(),
1934            ));
1935        }
1936        if config.name.is_empty() {
1937            return Err(OpenIAPError::ClientError(
1938                "No index name provided".to_string(),
1939            ));
1940        }
1941        let mut envelope = config.to_envelope();
1942        if !env.jwt.is_empty() {
1943            envelope.jwt = env.jwt;
1944        }
1945        if !env.spanid.is_empty() {
1946            envelope.spanid = env.spanid;
1947        }
1948        if !env.traceid.is_empty() {
1949            envelope.traceid = env.traceid;
1950        }
1951        let result = self.send(envelope, None).await;
1952        match result {
1953            Ok(m) => {
1954                let data = match m.data {
1955                    Some(data) => data,
1956                    None => {
1957                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
1958                    }
1959                };
1960                if m.command == "error" {
1961                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1962                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1963                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1964                }
1965                Ok(())
1966            }
1967            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1968        }
1969    }
1970    /// To query all documents in the entities collection where _type is test, you can use the following example:
1971    /// ```
1972    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1973    /// #[tokio::main]
1974    /// async fn main() -> Result<(), OpenIAPError> {
1975    ///     let client = Client::new_connect("").await?;
1976    ///     let q = client.query( QueryRequest::with_query(
1977    ///         "entities",
1978    ///         "{\"_type\":\"test\"}"
1979    ///     )).await?;
1980    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
1981    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1982    ///     for item in items {
1983    ///         println!("Item: {:?}", item);
1984    ///     }
1985    ///     Ok(())
1986    /// }
1987    /// ```
1988    /// To query all documents in the entities collection, and only return the name and _id field for all documents, you can use the following example:
1989    /// ```
1990    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
1991    /// #[tokio::main]
1992    /// async fn main() -> Result<(), OpenIAPError> {
1993    ///     let client = Client::new_connect("").await?;
1994    ///     let q = client.query( QueryRequest::with_projection(
1995    ///         "entities",
1996    ///         "{}",
1997    ///         "{\"name\":1}"
1998    ///     )).await?;
1999    ///     let items: serde_json::Value = serde_json::from_str(&q.results).unwrap();
2000    ///     let items: &Vec<serde_json::Value> = items.as_array().unwrap();
2001    ///     for item in items {
2002    ///         println!("Item: {:?}", item);
2003    ///     }
2004    ///     Ok(())
2005    /// }
2006    /// ```
2007    #[tracing::instrument(skip_all)]
2008    pub async fn query(&self, mut config: QueryRequest, env: EnvConfig) -> Result<QueryResponse, OpenIAPError> {
2009        if config.collectionname.is_empty() {
2010            config.collectionname = "entities".to_string();
2011        }
2012        let mut envelope = config.to_envelope();
2013        if !env.jwt.is_empty() {
2014            envelope.jwt = env.jwt;
2015        }
2016        if !env.spanid.is_empty() {
2017            envelope.spanid = env.spanid;
2018        }
2019        if !env.traceid.is_empty() {
2020            envelope.traceid = env.traceid;
2021        }
2022        let result = self.send(envelope, None).await;
2023        match result {
2024            Ok(m) => {
2025                let data = match m.data {
2026                    Some(data) => data,
2027                    None => {
2028                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2029                    }
2030                };
2031                if m.command == "error" {
2032                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2033                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2034                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2035                }
2036                let response: QueryResponse = prost::Message::decode(data.value.as_ref())
2037                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2038                debug!("Return Ok(response)");
2039                Ok(response)
2040            }
2041            Err(e) => {
2042                debug!("Error !!");
2043                Err(OpenIAPError::ClientError(e.to_string()))
2044            }
2045        }
2046    }
2047    /// Try and get a single document from the database.\
2048    /// If no document is found, it will return None.
2049    /// ```
2050    /// use openiap_client::{OpenIAPError, Client, QueryRequest};
2051    /// #[tokio::main]
2052    /// async fn main() -> Result<(), OpenIAPError> {
2053    ///     let client = Client::new_connect("").await?;
2054    ///     let config = QueryRequest::with_query(
2055    ///         "users",
2056    ///         "{\"username\":\"guest\"}"
2057    ///     );
2058    ///     let item = client.get_one(config).await;
2059    ///     match item {
2060    ///         Some(item) => {
2061    ///             assert_eq!(item["username"], "guest");
2062    ///             println!("Item: {:?}", item);
2063    ///         }
2064    ///         None => {
2065    ///             println!("No item found");
2066    ///             assert!(false, "No item found");
2067    ///         }
2068    ///     }
2069    ///     Ok(())
2070    /// }
2071    /// ```
2072    #[tracing::instrument(skip_all)]
2073    pub async fn get_one(&self, mut config: QueryRequest, env: EnvConfig) -> Option<serde_json::Value> {
2074        if config.collectionname.is_empty() {
2075            config.collectionname = "entities".to_string();
2076        }
2077        config.top = 1;
2078        let mut envelope = config.to_envelope();
2079        if !env.jwt.is_empty() {
2080            envelope.jwt = env.jwt;
2081        }
2082        if !env.spanid.is_empty() {
2083            envelope.spanid = env.spanid;
2084        }
2085        if !env.traceid.is_empty() {
2086            envelope.traceid = env.traceid;
2087        }
2088        let result = self.send(envelope, None).await;
2089        match result {
2090            Ok(m) => {
2091                let data = match m.data {
2092                    Some(data) => data,
2093                    None => return None,
2094                };
2095                if m.command == "error" {
2096                    return None;
2097                }
2098                let response: QueryResponse = prost::Message::decode(data.value.as_ref()).ok()?;
2099
2100                let items: serde_json::Value = serde_json::from_str(&response.results).unwrap();
2101                let items: &Vec<serde_json::Value> = items.as_array().unwrap();
2102                if items.is_empty() {
2103                    return None;
2104                }
2105                let item = items[0].clone();
2106                Some(item)
2107            }
2108            Err(_) => None,
2109        }
2110    }
2111
2112    /// Try and get a specefic version of a document from the database, reconstructing it from the history collection
2113    /// ```
2114    /// use openiap_client::{OpenIAPError, Client, GetDocumentVersionRequest, InsertOneRequest, UpdateOneRequest};
2115    /// #[tokio::main]
2116    /// async fn main() -> Result<(), OpenIAPError> {
2117    ///     let client = Client::new_connect("").await?;
2118    ///     let item = "{\"name\": \"test from rust\", \"_type\": \"test\"}";
2119    ///     let query = InsertOneRequest {
2120    ///         collectionname: "entities".to_string(),
2121    ///         item: item.to_string(),
2122    ///         j: true,
2123    ///         w: 2,
2124    ///         ..Default::default()
2125    ///     };
2126    ///     let response = client.insert_one(query).await;
2127    ///     let response = match response {
2128    ///         Ok(r) => r,
2129    ///         Err(e) => {
2130    ///             println!("Error: {:?}", e);
2131    ///             assert!(false, "insert_one failed with {:?}", e);
2132    ///             return Ok(());
2133    ///         }
2134    ///     };
2135    ///     let _obj: serde_json::Value = serde_json::from_str(&response.result).unwrap();
2136    ///     let _id = _obj["_id"].as_str().unwrap();
2137    ///     let item = format!("{{\"name\":\"updated from rust\", \"_id\": \"{}\"}}", _id);
2138    ///     let query = UpdateOneRequest {
2139    ///         collectionname: "entities".to_string(),
2140    ///         item: item.to_string(),
2141    ///         ..Default::default()
2142    ///     };
2143    ///     let response = client.update_one(query).await;
2144    ///     _ = match response {
2145    ///         Ok(r) => r,
2146    ///         Err(e) => {
2147    ///             println!("Error: {:?}", e);
2148    ///             assert!(false, "update_one failed with {:?}", e);
2149    ///             return Ok(());
2150    ///         }
2151    ///     };
2152    ///     let query = GetDocumentVersionRequest {
2153    ///         collectionname: "entities".to_string(),
2154    ///         id: _id.to_string(),
2155    ///         version: 0,
2156    ///         ..Default::default()
2157    ///     };
2158    ///     let response = client.get_document_version(query).await;
2159    ///     let response = match response {
2160    ///         Ok(r) => r,
2161    ///         Err(e) => {
2162    ///             println!("Error: {:?}", e);
2163    ///             assert!(false, "get_document_version failed with {:?}", e);
2164    ///             return Ok(());
2165    ///         }
2166    ///     };
2167    ///     let _obj = serde_json::from_str(&response);
2168    ///     let _obj: serde_json::Value = match _obj {
2169    ///         Ok(r) => r,
2170    ///         Err(e) => {
2171    ///             println!("Error: {:?}", e);
2172    ///             assert!(
2173    ///                 false,
2174    ///                 "parse get_document_version result failed with {:?}",
2175    ///                 e
2176    ///             );
2177    ///             return Ok(());
2178    ///         }
2179    ///     };
2180    ///     let name = _obj["name"].as_str().unwrap();
2181    ///     let version = _obj["_version"].as_i64().unwrap();
2182    ///     println!("version 0 Name: {}, Version: {}", name, version);
2183    ///     assert_eq!(name, "test from rust");
2184    ///     let query = GetDocumentVersionRequest {
2185    ///         collectionname: "entities".to_string(),
2186    ///         id: _id.to_string(),
2187    ///         version: 1,
2188    ///         ..Default::default()
2189    ///     };
2190    ///     let response = client.get_document_version(query).await;
2191    ///     assert!(
2192    ///         response.is_ok(),
2193    ///         "test_get_document_version failed with {:?}",
2194    ///         response.err().unwrap()
2195    ///     );
2196    ///     let _obj: serde_json::Value = serde_json::from_str(&response.unwrap()).unwrap();
2197    ///     let name = _obj["name"].as_str().unwrap();
2198    ///     let version = _obj["_version"].as_i64().unwrap();
2199    ///     println!("version 1 Name: {}, Version: {}", name, version);
2200    ///     assert_eq!(name, "updated from rust");
2201    ///     let query = GetDocumentVersionRequest {
2202    ///         collectionname: "entities".to_string(),
2203    ///         id: _id.to_string(),
2204    ///         version: -1,
2205    ///         ..Default::default()
2206    ///     };
2207    ///     let response = client.get_document_version(query).await;
2208    ///     assert!(
2209    ///         response.is_ok(),
2210    ///         "test_get_document_version failed with {:?}",
2211    ///         response.err().unwrap()
2212    ///     );
2213    ///     let _obj: serde_json::Value = serde_json::from_str(&response.unwrap()).unwrap();
2214    ///     let name = _obj["name"].as_str().unwrap();
2215    ///     let version = _obj["_version"].as_i64().unwrap();
2216    ///     println!("version -1 Name: {}, Version: {}", name, version);
2217    ///     assert_eq!(name, "updated from rust");
2218    ///     Ok(())
2219    /// }
2220    /// ```
2221    #[tracing::instrument(skip_all)]
2222    pub async fn get_document_version(
2223        &self,
2224        mut config: GetDocumentVersionRequest,
2225        env: EnvConfig,
2226    ) -> Result<String, OpenIAPError> {
2227        if config.collectionname.is_empty() {
2228            config.collectionname = "entities".to_string();
2229        }
2230        if config.id.is_empty() {
2231            return Err(OpenIAPError::ClientError("No id provided".to_string()));
2232        }
2233        let mut envelope = config.to_envelope();
2234        if !env.jwt.is_empty() {
2235            envelope.jwt = env.jwt;
2236        }
2237        if !env.spanid.is_empty() {
2238            envelope.spanid = env.spanid;
2239        }
2240        if !env.traceid.is_empty() {
2241            envelope.traceid = env.traceid;
2242        }
2243        let result = self.send(envelope, None).await;
2244        match result {
2245            Ok(m) => {
2246                let data = match m.data {
2247                    Some(data) => data,
2248                    None => {
2249                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2250                    }
2251                };
2252                if m.command == "error" {
2253                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2254                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2255                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2256                }
2257                let response: GetDocumentVersionResponse =
2258                    prost::Message::decode(data.value.as_ref())
2259                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2260                Ok(response.result)
2261            }
2262            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2263        }
2264    }
2265    /// Run an aggregate pipeline towards the database
2266    /// Example of running an aggregate pipeline on the entities collection, counting the number of documents with _type=test, and grouping them by name:
2267    /// ```
2268    /// use openiap_client::{OpenIAPError, Client, AggregateRequest};
2269    /// #[tokio::main]
2270    /// async fn main() -> Result<(), OpenIAPError> {
2271    ///    let client = Client::new_connect("").await?;
2272    ///     let config = AggregateRequest {
2273    ///         collectionname: "entities".to_string(),
2274    ///         aggregates: "[{\"$match\": {\"_type\": \"test\"}}, {\"$group\": {\"_id\": \"$name\", \"count\": {\"$sum\": 1}}}]".to_string(),
2275    ///         ..Default::default()
2276    ///     };
2277    ///     let response = client.aggregate(config).await?;
2278    ///     println!("Response: {:?}", response);
2279    ///     Ok(())
2280    /// }
2281    /// ```
2282    /// 
2283    #[tracing::instrument(skip_all)]
2284    pub async fn aggregate(
2285        &self,
2286        mut config: AggregateRequest,
2287        env: EnvConfig,
2288    ) -> Result<AggregateResponse, OpenIAPError> {
2289        if config.collectionname.is_empty() {
2290            config.collectionname = "entities".to_string();
2291        }
2292        if config.hint.is_empty() {
2293            config.hint = "".to_string();
2294        }
2295        if config.queryas.is_empty() {
2296            config.queryas = "".to_string();
2297        }
2298        if config.aggregates.is_empty() {
2299            return Err(OpenIAPError::ClientError(
2300                "No aggregates provided".to_string(),
2301            ));
2302        }
2303        let mut envelope = config.to_envelope();
2304        if !env.jwt.is_empty() {
2305            envelope.jwt = env.jwt;
2306        }
2307        if !env.spanid.is_empty() {
2308            envelope.spanid = env.spanid;
2309        }
2310        if !env.traceid.is_empty() {
2311            envelope.traceid = env.traceid;
2312        }
2313        let result = self.send(envelope, None).await;
2314        match result {
2315            Ok(m) => {
2316                let data = match m.data {
2317                    Some(data) => data,
2318                    None => {
2319                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2320                    }
2321                };
2322                if m.command == "error" {
2323                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2324                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2325                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2326                }
2327                let response: AggregateResponse = prost::Message::decode(data.value.as_ref())
2328                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2329                Ok(response)
2330            }
2331            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2332        }
2333    }
2334    /// Count the number of documents in a collection, with an optional query
2335    #[tracing::instrument(skip_all)]
2336    pub async fn count(&self, mut config: CountRequest, env: EnvConfig) -> Result<CountResponse, OpenIAPError> {
2337        if config.collectionname.is_empty() {
2338            config.collectionname = "entities".to_string();
2339        }
2340        if config.query.is_empty() {
2341            config.query = "{}".to_string();
2342        }
2343        let mut envelope = config.to_envelope();
2344        if !env.jwt.is_empty() {
2345            envelope.jwt = env.jwt;
2346        }
2347        if !env.spanid.is_empty() {
2348            envelope.spanid = env.spanid;
2349        }
2350        if !env.traceid.is_empty() {
2351            envelope.traceid = env.traceid;
2352        }
2353        let result = self.send(envelope, None).await;
2354        match result {
2355            Ok(m) => {
2356                let data = match m.data {
2357                    Some(data) => data,
2358                    None => {
2359                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2360                    }
2361                };
2362                if m.command == "error" {
2363                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2364                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2365                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2366                }
2367                let response: CountResponse = prost::Message::decode(data.value.as_ref())
2368                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2369                Ok(response)
2370            }
2371            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2372        }
2373    }
2374    /// Get distinct values for a field in a collection, with an optional query
2375    #[tracing::instrument(skip_all)]
2376    pub async fn distinct(
2377        &self,
2378        mut config: DistinctRequest,
2379        env: EnvConfig,
2380    ) -> Result<DistinctResponse, OpenIAPError> {
2381        if config.collectionname.is_empty() {
2382            config.collectionname = "entities".to_string();
2383        }
2384        if config.query.is_empty() {
2385            config.query = "{}".to_string();
2386        }
2387        if config.field.is_empty() {
2388            return Err(OpenIAPError::ClientError("No field provided".to_string()));
2389        }
2390        let mut envelope = config.to_envelope();
2391        if !env.jwt.is_empty() {
2392            envelope.jwt = env.jwt;
2393        }
2394        if !env.spanid.is_empty() {
2395            envelope.spanid = env.spanid;
2396        }
2397        if !env.traceid.is_empty() {
2398            envelope.traceid = env.traceid;
2399        }
2400        let result = self.send(envelope, None).await;
2401        match result {
2402            Ok(m) => {
2403                let data = match m.data {
2404                    Some(data) => data,
2405                    None => {
2406                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2407                    }
2408                };
2409                if m.command == "error" {
2410                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2411                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2412                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2413                }
2414                let response: DistinctResponse = prost::Message::decode(data.value.as_ref())
2415                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2416                Ok(response)
2417            }
2418            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2419        }
2420    }
2421    /// Insert a document into a collection
2422    #[tracing::instrument(skip_all)]
2423    pub async fn insert_one(
2424        &self,
2425        config: InsertOneRequest,
2426        env: EnvConfig,
2427    ) -> Result<InsertOneResponse, OpenIAPError> {
2428        let mut envelope = config.to_envelope();
2429        if !env.jwt.is_empty() {
2430            envelope.jwt = env.jwt;
2431        }
2432        if !env.spanid.is_empty() {
2433            envelope.spanid = env.spanid;
2434        }
2435        if !env.traceid.is_empty() {
2436            envelope.traceid = env.traceid;
2437        }
2438        let result = self.send(envelope, None).await;
2439        match result {
2440            Ok(m) => {
2441                let data = match m.data {
2442                    Some(data) => data,
2443                    None => {
2444                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2445                    }
2446                };
2447                if m.command == "error" {
2448                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2449                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2450                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2451                }
2452                let response: InsertOneResponse = prost::Message::decode(data.value.as_ref())
2453                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2454                Ok(response)
2455            }
2456            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2457        }
2458    }
2459    /// Insert many documents into a collection
2460    #[tracing::instrument(skip_all)]
2461    pub async fn insert_many(
2462        &self,
2463        config: InsertManyRequest,
2464        env: EnvConfig,
2465    ) -> Result<InsertManyResponse, OpenIAPError> {
2466        let mut envelope = config.to_envelope();
2467        if !env.jwt.is_empty() {
2468            envelope.jwt = env.jwt;
2469        }
2470        if !env.spanid.is_empty() {
2471            envelope.spanid = env.spanid;
2472        }
2473        if !env.traceid.is_empty() {
2474            envelope.traceid = env.traceid;
2475        }
2476        let result = self.send(envelope, None).await;
2477        match result {
2478            Ok(m) => {
2479                let data = match m.data {
2480                    Some(data) => data,
2481                    None => {
2482                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2483                    }
2484                };
2485                if m.command == "error" {
2486                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2487                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2488                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2489                }
2490                let response: InsertManyResponse = prost::Message::decode(data.value.as_ref())
2491                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2492                Ok(response)
2493            }
2494            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2495        }
2496    }
2497    /// Update ( replace ) a document in a collection
2498    #[tracing::instrument(skip_all)]
2499    pub async fn update_one(
2500        &self,
2501        config: UpdateOneRequest,
2502        env: EnvConfig,
2503    ) -> Result<UpdateOneResponse, OpenIAPError> {
2504        let mut envelope = config.to_envelope();
2505        if !env.jwt.is_empty() {
2506            envelope.jwt = env.jwt;
2507        }
2508        if !env.spanid.is_empty() {
2509            envelope.spanid = env.spanid;
2510        }
2511        if !env.traceid.is_empty() {
2512            envelope.traceid = env.traceid;
2513        }
2514        let result = self.send(envelope, None).await;
2515        match result {
2516            Ok(m) => {
2517                let data = match m.data {
2518                    Some(data) => data,
2519                    None => {
2520                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2521                    }
2522                };
2523                if m.command == "error" {
2524                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2525                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2526                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2527                }
2528                let response: UpdateOneResponse = prost::Message::decode(data.value.as_ref())
2529                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2530                Ok(response)
2531            }
2532            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2533        }
2534    }
2535    /// Using a unique key, insert a document or update it if it already exists ( upsert on steroids )
2536    #[tracing::instrument(skip_all)]
2537    pub async fn insert_or_update_one(
2538        &self,
2539        config: InsertOrUpdateOneRequest,
2540        env: EnvConfig,
2541    ) -> Result<String, OpenIAPError> {
2542        let mut envelope = config.to_envelope();
2543        if !env.jwt.is_empty() {
2544            envelope.jwt = env.jwt;
2545        }
2546        if !env.spanid.is_empty() {
2547            envelope.spanid = env.spanid;
2548        }
2549        if !env.traceid.is_empty() {
2550            envelope.traceid = env.traceid;
2551        }
2552        let result = self.send(envelope, None).await;
2553        match result {
2554            Ok(m) => {
2555                let data = match m.data {
2556                    Some(data) => data,
2557                    None => {
2558                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2559                    }
2560                };
2561                if m.command == "error" {
2562                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2563                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2564                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2565                }
2566                let response: InsertOrUpdateOneResponse =
2567                    prost::Message::decode(data.value.as_ref())
2568                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2569                Ok(response.result)
2570            }
2571            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2572        }
2573    }
2574    /// Using a unique key, insert many documents or update them if they already exist ( upsert on steroids )
2575    #[tracing::instrument(skip_all)]
2576    pub async fn insert_or_update_many(
2577        &self,
2578        config: InsertOrUpdateManyRequest,
2579        env: EnvConfig,
2580    ) -> Result<InsertOrUpdateManyResponse, OpenIAPError> {
2581        let mut envelope = config.to_envelope();
2582        if !env.jwt.is_empty() {
2583            envelope.jwt = env.jwt;
2584        }
2585        if !env.spanid.is_empty() {
2586            envelope.spanid = env.spanid;
2587        }
2588        if !env.traceid.is_empty() {
2589            envelope.traceid = env.traceid;
2590        }
2591        let result = self.send(envelope, None).await;
2592        match result {
2593            Ok(m) => {
2594                let data = match m.data {
2595                    Some(data) => data,
2596                    None => {
2597                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2598                    }
2599                };
2600                if m.command == "error" {
2601                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2602                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2603                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2604                }
2605                let response: InsertOrUpdateManyResponse =
2606                    prost::Message::decode(data.value.as_ref())
2607                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2608                Ok(response)
2609            }
2610            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2611        }
2612    }
2613    /// Update one or more documents in a collection using a update document
2614    #[tracing::instrument(skip_all)]
2615    pub async fn update_document(
2616        &self,
2617        config: UpdateDocumentRequest,
2618        env: EnvConfig,
2619    ) -> Result<UpdateDocumentResponse, OpenIAPError> {
2620        let mut envelope = config.to_envelope();
2621        if !env.jwt.is_empty() {
2622            envelope.jwt = env.jwt;
2623        }
2624        if !env.spanid.is_empty() {
2625            envelope.spanid = env.spanid;
2626        }
2627        if !env.traceid.is_empty() {
2628            envelope.traceid = env.traceid;
2629        }
2630        let result = self.send(envelope, None).await;
2631        match result {
2632            Ok(m) => {
2633                let data = match m.data {
2634                    Some(data) => data,
2635                    None => {
2636                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2637                    }
2638                };
2639                if m.command == "error" {
2640                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2641                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2642                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2643                }
2644                let response: UpdateDocumentResponse = prost::Message::decode(data.value.as_ref())
2645                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2646                Ok(response)
2647            }
2648            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2649        }
2650    }
2651    /// Delete a document from a collection using a unique key
2652    #[tracing::instrument(skip_all)]
2653    pub async fn delete_one(&self, config: DeleteOneRequest, env: EnvConfig) -> Result<i32, OpenIAPError> {
2654        let mut envelope = config.to_envelope();
2655        if !env.jwt.is_empty() {
2656            envelope.jwt = env.jwt;
2657        }
2658        if !env.spanid.is_empty() {
2659            envelope.spanid = env.spanid;
2660        }
2661        if !env.traceid.is_empty() {
2662            envelope.traceid = env.traceid;
2663        }
2664        let result = self.send(envelope, None).await;
2665        match result {
2666            Ok(m) => {
2667                let data = match m.data {
2668                    Some(data) => data,
2669                    None => {
2670                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2671                    }
2672                };
2673                if m.command == "error" {
2674                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2675                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2676                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2677                }
2678                let response: DeleteOneResponse = prost::Message::decode(data.value.as_ref())
2679                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2680                Ok(response.affectedrows)
2681            }
2682            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2683        }
2684    }
2685    /// Delete many documents from a collection using a query or list of unique keys
2686    #[tracing::instrument(skip_all)]
2687    pub async fn delete_many(&self, config: DeleteManyRequest, env: EnvConfig) -> Result<i32, OpenIAPError> {
2688        let mut envelope = config.to_envelope();
2689        if !env.jwt.is_empty() {
2690            envelope.jwt = env.jwt;
2691        }
2692        if !env.spanid.is_empty() {
2693            envelope.spanid = env.spanid;
2694        }
2695        if !env.traceid.is_empty() {
2696            envelope.traceid = env.traceid;
2697        }
2698        let result = self.send(envelope, None).await;
2699        match result {
2700            Ok(m) => {
2701                let data = match m.data {
2702                    Some(data) => data,
2703                    None => {
2704                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2705                    }
2706                };
2707                if m.command == "error" {
2708                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2709                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2710                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2711                }
2712                let response: DeleteManyResponse = prost::Message::decode(data.value.as_ref())
2713                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2714                Ok(response.affectedrows)
2715            }
2716            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2717        }
2718    }
2719    /// Download a file from the database
2720    #[tracing::instrument(skip_all)]
2721    pub async fn download(
2722        &self,
2723        config: DownloadRequest,
2724        env: EnvConfig,
2725        folder: Option<&str>,
2726        filename: Option<&str>,
2727    ) -> Result<DownloadResponse, OpenIAPError> {
2728        let mut envelope = config.to_envelope();
2729        if !env.jwt.is_empty() {
2730            envelope.jwt = env.jwt;
2731        }
2732        if !env.spanid.is_empty() {
2733            envelope.spanid = env.spanid;
2734        }
2735        if !env.traceid.is_empty() {
2736            envelope.traceid = env.traceid;
2737        }
2738        match self.sendwithstream(envelope).await {
2739            Ok((response_rx, mut stream_rx)) => {
2740                let temp_file_path = util::generate_unique_filename("openiap");
2741                debug!("Temp file: {:?}", temp_file_path);
2742                let mut temp_file = File::create(&temp_file_path).map_err(|e| {
2743                    OpenIAPError::ClientError(format!("Failed to create temp file: {}", e))
2744                })?;
2745                while !stream_rx.is_closed() {
2746                    match stream_rx.recv().await {
2747                        Some(received) => {
2748                            if received.is_empty() {
2749                                debug!("Stream closed");
2750                                break;
2751                            }
2752                            debug!("Received {} bytes", received.len());
2753                            temp_file.write_all(&received).map_err(|e| {
2754                                OpenIAPError::ClientError(format!(
2755                                    "Failed to write to temp file: {}",
2756                                    e
2757                                ))
2758                            })?;
2759                        }
2760                        None => {
2761                            debug!("Stream closed");
2762                            break;
2763                        }
2764                    }
2765                }
2766                temp_file.sync_all().map_err(|e| {
2767                    OpenIAPError::ClientError(format!("Failed to sync temp file: {}", e))
2768                })?;
2769
2770                let response = response_rx.await.map_err(|_| {
2771                    OpenIAPError::ClientError("Failed to receive response".to_string())
2772                })?;
2773
2774                if response.command == "error" {
2775                    let data = match response.data {
2776                        Some(data) => data,
2777                        None => {
2778                            return Err(OpenIAPError::ClientError(
2779                                "No data returned for SERVER error".to_string(),
2780                            ));
2781                        }
2782                    };
2783                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref()).unwrap();
2784                    return Err(OpenIAPError::ServerError(e.message));
2785                }
2786                let mut downloadresponse: DownloadResponse =
2787                    prost::Message::decode(response.data.unwrap().value.as_ref()).unwrap();
2788
2789                let mut final_filename = match &filename {
2790                    Some(f) => f,
2791                    None => downloadresponse.filename.as_str(),
2792                };
2793                if final_filename.is_empty() {
2794                    final_filename = downloadresponse.filename.as_str();
2795                }
2796                let mut folder = match &folder {
2797                    Some(f) => f,
2798                    None => ".",
2799                };
2800                if folder.is_empty() {
2801                    folder = ".";
2802                }
2803                let filepath = format!("{}/{}", folder, final_filename);
2804                trace!("Moving file to {}", filepath);
2805                util::move_file(temp_file_path.to_str().unwrap(), filepath.as_str()).map_err(|e| {
2806                    OpenIAPError::ClientError(format!("Failed to move file: {}", e))
2807                })?;
2808                debug!("Downloaded file to {}", filepath);
2809                downloadresponse.filename = filepath;
2810
2811                Ok(downloadresponse)
2812            }
2813            Err(status) => Err(OpenIAPError::ClientError(status.to_string())),
2814        }
2815    }
2816    /// Upload a file to the database
2817    #[tracing::instrument(skip_all)]
2818    pub async fn upload(
2819        &self,
2820        config: UploadRequest,
2821        env: EnvConfig,
2822        filepath: &str,
2823    ) -> Result<UploadResponse, OpenIAPError> {
2824        // debug!("upload: Uploading file: {}", filepath);
2825        // let mut file = File::open(filepath)
2826        //     .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2827        // let chunk_size = 1024 * 1024;
2828        // let mut buffer = vec![0; chunk_size];
2829
2830        // let envelope = config.to_envelope();
2831        // let (response_rx, rid) = self.send_noawait(envelope).await?;
2832        // {
2833        //     let envelope = BeginStream::from_rid(rid.clone());
2834        //     debug!("Sending beginstream to #{}", rid);
2835        //     self.send_envelope(envelope).await.map_err(|e| OpenIAPError::ClientError(format!("Failed to send data: {}", e)))?;
2836        //     let mut counter = 0;
2837
2838        //     loop {
2839        //         let bytes_read = file.read(&mut buffer).map_err(|e| {
2840        //             OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2841        //         })?;
2842        //         counter += 1;
2843
2844        //         if bytes_read == 0 {
2845        //             break;
2846        //         }
2847
2848        //         let chunk = buffer[..bytes_read].to_vec();
2849        //         let envelope = Stream::from_rid(chunk, rid.clone());
2850        //         debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2851        //         self.send_envelope(envelope).await.map_err(|e| {
2852        //             OpenIAPError::ClientError(format!("Failed to send data: {}", e))
2853        //         })?
2854        //     }
2855
2856        //     let envelope = EndStream::from_rid(rid.clone());
2857        //     debug!("Sending endstream to #{}", rid);
2858        //     self.send_envelope(envelope).await
2859        //         .map_err(|e| OpenIAPError::ClientError(format!("Failed to send data: {}", e)))?;
2860        // }
2861
2862        // debug!("Wait for upload response for #{}", rid);
2863        // match response_rx.await {
2864        //     Ok(response) => {
2865        //         if response.command == "error" {
2866        //             let error_response: ErrorResponse = prost::Message::decode(
2867        //                 response.data.unwrap().value.as_ref(),
2868        //             )
2869        //             .map_err(|e| {
2870        //                 OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2871        //             })?;
2872        //             return Err(OpenIAPError::ServerError(error_response.message));
2873        //         }
2874        //         let upload_response: UploadResponse =
2875        //             prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2876        //                 OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2877        //             })?;
2878        //         Ok(upload_response)
2879        //     }
2880        //     Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2881        // }
2882        debug!("upload: Uploading file: {}", filepath);
2883        let mut file = File::open(filepath)
2884            .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2885        let chunk_size = 1024 * 1024;
2886        let mut buffer = vec![0; chunk_size];
2887    
2888        // Send the initial upload request
2889        let mut envelope = config.to_envelope();
2890        if !env.jwt.is_empty() {
2891            envelope.jwt = env.jwt;
2892        }
2893        if !env.spanid.is_empty() {
2894            envelope.spanid = env.spanid;
2895        }
2896        if !env.traceid.is_empty() {
2897            envelope.traceid = env.traceid;
2898        }
2899        let (response_rx, rid) = self.send_noawait(envelope).await?;
2900        
2901        // Send the BeginStream message
2902        let envelope = BeginStream::from_rid(rid.clone());
2903        debug!("Sending beginstream to #{}", rid);
2904        if let Err(e) = self.send_envelope(envelope).await {
2905            let inner = self.inner.lock().await;
2906            inner.queries.lock().await.remove(&rid);
2907            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2908        }
2909    
2910        // Send file chunks
2911        let mut counter = 0;
2912        loop {
2913            let bytes_read = file.read(&mut buffer).map_err(|e| {
2914                OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2915            })?;
2916            counter += 1;
2917    
2918            if bytes_read == 0 {
2919                break;
2920            }
2921    
2922            let chunk = buffer[..bytes_read].to_vec();
2923            let envelope = Stream::from_rid(chunk, rid.clone());
2924            debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2925            if let Err(e) = self.send_envelope(envelope).await {
2926                let inner = self.inner.lock().await;
2927                inner.queries.lock().await.remove(&rid);
2928                return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2929            }
2930        }
2931    
2932        // Send the EndStream message
2933        let envelope = EndStream::from_rid(rid.clone());
2934        debug!("Sending endstream to #{}", rid);
2935        if let Err(e) = self.send_envelope(envelope).await {
2936            let inner = self.inner.lock().await;
2937            inner.queries.lock().await.remove(&rid);
2938            return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2939        }
2940    
2941        // Await the response and clean up `inner.queries` afterward
2942        debug!("Wait for upload response for #{}", rid);
2943        let result = response_rx.await;
2944        let inner = self.inner.lock().await;
2945        inner.queries.lock().await.remove(&rid);
2946    
2947        match result {
2948            Ok(response) => {
2949                if response.command == "error" {
2950                    let error_response: ErrorResponse = prost::Message::decode(
2951                        response.data.unwrap().value.as_ref(),
2952                    )
2953                    .map_err(|e| {
2954                        OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2955                    })?;
2956                    return Err(OpenIAPError::ServerError(error_response.message));
2957                }
2958                let upload_response: UploadResponse =
2959                    prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2960                        OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2961                    })?;
2962                Ok(upload_response)
2963            }
2964            Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2965        }
2966    }
2967    /// Watch for changes in a collection ( change stream )
2968    #[tracing::instrument(skip_all)]
2969    pub async fn watch(
2970        &self,
2971        mut config: WatchRequest,
2972        env: EnvConfig,
2973        callback: Box<dyn Fn(WatchEvent) + Send + Sync>,
2974    ) -> Result<String, OpenIAPError> {
2975        if config.collectionname.is_empty() {
2976            config.collectionname = "entities".to_string();
2977        }
2978        if config.paths.is_empty() {
2979            config.paths = vec!["".to_string()];
2980        }
2981        let mut envelope = config.to_envelope();
2982        if !env.jwt.is_empty() {
2983            envelope.jwt = env.jwt;
2984        }
2985        if !env.spanid.is_empty() {
2986            envelope.spanid = env.spanid;
2987        }
2988        if !env.traceid.is_empty() {
2989            envelope.traceid = env.traceid;
2990        }
2991        let result = self.send(envelope, None).await;
2992        match result {
2993            Ok(m) => {
2994                let data = match m.data {
2995                    Some(data) => data,
2996                    None => {
2997                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
2998                    }
2999                };
3000                if m.command == "error" {
3001                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3002                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3003                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3004                }
3005                let response: WatchResponse = prost::Message::decode(data.value.as_ref())
3006                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3007
3008                let inner = self.inner.lock().await;
3009                inner
3010                    .watches
3011                    .lock()
3012                    .await
3013                    .insert(response.id.clone(), callback);
3014
3015                Ok(response.id)
3016            }
3017            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3018        }
3019    }
3020    /// Cancel a watch ( change stream )
3021    #[tracing::instrument(skip_all)]
3022    pub async fn unwatch(&self, env: EnvConfig, id: &str) -> Result<(), OpenIAPError> {
3023        let config = UnWatchRequest::byid(id);
3024        let mut envelope = config.to_envelope();
3025        if !env.jwt.is_empty() {
3026            envelope.jwt = env.jwt;
3027        }
3028        if !env.spanid.is_empty() {
3029            envelope.spanid = env.spanid;
3030        }
3031        if !env.traceid.is_empty() {
3032            envelope.traceid = env.traceid;
3033        }
3034        let result = self.send(envelope, None).await;
3035        match result {
3036            Ok(m) => {
3037                let data = match m.data {
3038                    Some(data) => data,
3039                    None => {
3040                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3041                    }
3042                };
3043                if m.command == "error" {
3044                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3045                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3046                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3047                }
3048                Ok(())
3049            }
3050            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3051        }
3052    }
3053    /// Register a queue for messaging ( amqp ) in the OpenIAP service
3054    #[tracing::instrument(skip_all)]
3055    pub async fn register_queue(
3056        &self,
3057        mut config: RegisterQueueRequest,
3058        env: EnvConfig,
3059        callback: QueueCallbackFn,
3060    ) -> Result<String, OpenIAPError> {
3061        if config.queuename.is_empty() {
3062            config.queuename = "".to_string();
3063        }
3064        let mut envelope = config.to_envelope();
3065        if !env.jwt.is_empty() {
3066            envelope.jwt = env.jwt;
3067        }
3068        if !env.spanid.is_empty() {
3069            envelope.spanid = env.spanid;
3070        }
3071        if !env.traceid.is_empty() {
3072            envelope.traceid = env.traceid;
3073        }
3074        let result = self.send(envelope, None).await;
3075        match result {
3076            Ok(m) => {
3077                let data = match m.data {
3078                    Some(data) => data,
3079                    None => {
3080                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3081                    }
3082                };
3083                if m.command == "error" {
3084                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3085                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3086                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3087                }
3088                let response: RegisterQueueResponse =
3089                    prost::Message::decode(data.value.as_ref())
3090                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3091
3092                let inner = self.inner.lock().await;
3093                inner
3094                    .queues
3095                    .lock()
3096                    .await
3097                    .insert(response.queuename.clone(), callback);
3098
3099                Ok(response.queuename)
3100            }
3101            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3102        }
3103    }
3104    /// Unregister a queue or exchange for messaging ( amqp ) in the OpenIAP service
3105    #[tracing::instrument(skip_all)]
3106    pub async fn unregister_queue(&self, env: EnvConfig, queuename: &str) -> Result<(), OpenIAPError> {
3107        let config = UnRegisterQueueRequest::byqueuename(queuename);
3108        let mut envelope = config.to_envelope();
3109        if !env.jwt.is_empty() {
3110            envelope.jwt = env.jwt;
3111        }
3112        if !env.spanid.is_empty() {
3113            envelope.spanid = env.spanid;
3114        }
3115        if !env.traceid.is_empty() {
3116            envelope.traceid = env.traceid;
3117        }
3118        let result = self.send(envelope, None).await;
3119        match result {
3120            Ok(m) => {
3121                let data = match m.data {
3122                    Some(data) => data,
3123                    None => {
3124                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3125                    }
3126                };
3127                if m.command == "error" {
3128                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3129                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3130                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3131                }
3132                Ok(())
3133            }
3134            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3135        }
3136    }
3137    /// Register a exchange for messaging ( amqp ) in the OpenIAP service
3138    #[tracing::instrument(skip_all)]
3139    pub async fn register_exchange(
3140        &self,
3141        mut config: RegisterExchangeRequest,
3142        env: EnvConfig,
3143        callback: QueueCallbackFn,
3144    ) -> Result<String, OpenIAPError> {
3145        if config.exchangename.is_empty() {
3146            return Err(OpenIAPError::ClientError(
3147                "No exchange name provided".to_string(),
3148            ));
3149        }
3150        if config.algorithm.is_empty() {
3151            config.algorithm = "fanout".to_string();
3152        }
3153        let mut envelope = config.to_envelope();
3154        if !env.jwt.is_empty() {
3155            envelope.jwt = env.jwt;
3156        }
3157        if !env.spanid.is_empty() {
3158            envelope.spanid = env.spanid;
3159        }
3160        if !env.traceid.is_empty() {
3161            envelope.traceid = env.traceid;
3162        }
3163        let result = self.send(envelope, None).await;
3164        match result {
3165            Ok(m) => {
3166                let data = match m.data {
3167                    Some(data) => data,
3168                    None => {
3169                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3170                    }
3171                };
3172                if m.command == "error" {
3173                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3174                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3175                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3176                }
3177                let response: RegisterExchangeResponse =
3178                    prost::Message::decode(data.value.as_ref())
3179                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3180                if !response.queuename.is_empty() {
3181                    let inner = self.inner.lock().await;
3182                    inner
3183                        .queues
3184                        .lock()
3185                        .await
3186                        .insert(response.queuename.clone(), callback);
3187                }
3188                Ok(response.queuename)
3189            }
3190            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3191        }
3192    }
3193    /// Send a message to a queue or exchange in the OpenIAP service
3194    #[tracing::instrument(skip_all)]
3195    pub async fn queue_message(
3196        &self,
3197        config: QueueMessageRequest,
3198        env: EnvConfig,
3199    ) -> Result<QueueMessageResponse, OpenIAPError> {
3200        if config.queuename.is_empty() && config.exchangename.is_empty() {
3201            return Err(OpenIAPError::ClientError(
3202                "No queue or exchange name provided".to_string(),
3203            ));
3204        }
3205        let mut envelope = config.to_envelope();
3206        if !env.jwt.is_empty() {
3207            envelope.jwt = env.jwt;
3208        }
3209        if !env.spanid.is_empty() {
3210            envelope.spanid = env.spanid;
3211        }
3212        if !env.traceid.is_empty() {
3213            envelope.traceid = env.traceid;
3214        }
3215        let result = self.send(envelope, None).await;
3216        match result {
3217            Ok(m) => {
3218                let data = match m.data {
3219                    Some(d) => d,
3220                    None => {
3221                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
3222                    }
3223                };
3224                if m.command == "error" {
3225                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3226                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3227                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3228                }
3229                let response: QueueMessageResponse = prost::Message::decode(data.value.as_ref())
3230                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3231                Ok(response)
3232            }
3233            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3234        }
3235    }
3236    /// Send message to a queue or exchange in the OpenIAP service, and wait for a reply
3237    #[tracing::instrument(skip_all)]
3238    pub async fn rpc(&self, mut config: QueueMessageRequest, env: EnvConfig, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
3239        if config.queuename.is_empty() && config.exchangename.is_empty() {
3240            return Err(OpenIAPError::ClientError(
3241                "No queue or exchange name provided".to_string(),
3242            ));
3243        }
3244
3245        let (tx, rx) = oneshot::channel::<String>();
3246        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3247
3248        // Prepare the callback
3249        let callback: QueueCallbackFn = Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3250            if let Some(tx) = tx.lock().unwrap().take() {
3251                let _ = tx.send(event.data);
3252            } else {
3253                debug!("Queue already closed");
3254            }
3255            Box::pin(async { None })
3256        });
3257
3258        // Check if we already have a reply queue and callback registered
3259        let mut reply_queue_guard = self.rpc_reply_queue.lock().await;
3260        let mut callback_guard = self.rpc_callback.lock().await;
3261
3262        // If not registered, or if connection state is not Connected/Signedin, re-register
3263        let state = self.get_state();
3264        if reply_queue_guard.is_none() || !(state == ClientState::Connected || state == ClientState::Signedin) {
3265            // Register a new reply queue
3266            let q = self
3267                .register_queue(
3268                    RegisterQueueRequest {
3269                        queuename: "".to_string(),
3270                    },
3271                    crate::EnvConfig::new(),
3272                    callback.clone(),
3273                )
3274                .await?;
3275            *reply_queue_guard = Some(q.clone());
3276            *callback_guard = Some(callback.clone());
3277        } else {
3278            // Already registered, but need to update the callback for this call
3279            if let Some(qname) = reply_queue_guard.as_ref() {
3280                let inner = self.inner.lock().await;
3281                inner.queues.lock().await.insert(qname.clone(), callback.clone());
3282            }
3283        }
3284
3285        let q = reply_queue_guard.as_ref().unwrap().clone();
3286        config.replyto = q.clone();
3287        let mut envelope = config.to_envelope();
3288        if !env.jwt.is_empty() {
3289            envelope.jwt = env.jwt;
3290        }
3291        if !env.spanid.is_empty() {
3292            envelope.spanid = env.spanid;
3293        }
3294        if !env.traceid.is_empty() {
3295            envelope.traceid = env.traceid;
3296        }
3297
3298        let result = self.send(envelope, None).await;
3299        let rpc_result = match result {
3300            Ok(m) => {
3301                let data = match m.data {
3302                    Some(d) => d,
3303                    None => {
3304                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
3305                    }
3306                };
3307                if m.command == "error" {
3308                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3309                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3310                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3311                }
3312
3313                match tokio::time::timeout(timeout, rx).await {
3314                    Ok(Ok(val)) => Ok(val),
3315                    Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3316                    Err(_) => {
3317                        // Timeout: clear the cached queue so it will be re-registered next time
3318                        *reply_queue_guard = None;
3319                        *callback_guard = None;
3320                        Err(OpenIAPError::ClientError("RPC request timed out".to_string()))
3321                    },
3322                }
3323            }
3324            Err(e) => {
3325                // If we get an error, clear the cached queue so it will be re-registered next time
3326                *reply_queue_guard = None;
3327                *callback_guard = None;
3328                Err(OpenIAPError::ClientError(e.to_string()))
3329            }
3330        };
3331
3332        rpc_result
3333    }
3334    // pub async fn rpc2(&self, mut config: QueueMessageRequest, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
3335    //     if config.queuename.is_empty() && config.exchangename.is_empty() {
3336    //         return Err(OpenIAPError::ClientError(
3337    //             "No queue or exchange name provided".to_string(),
3338    //         ));
3339    //     }
3340
3341    //     let (tx, rx) = oneshot::channel::<String>();
3342    //     let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3343
3344    //     let q = self
3345    //         .register_queue(
3346    //             RegisterQueueRequest {
3347    //                 queuename: "".to_string(),
3348    //             },
3349    //             Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3350    //                 if let Some(tx) = tx.lock().unwrap().take() {
3351    //                     let _ = tx.send(event.data);
3352    //                 } else {
3353    //                     debug!("Queue already closed");
3354    //                 }
3355    //                 Box::pin(async { None })
3356    //             }),
3357    //         )
3358    //         .await
3359    //         .unwrap();
3360
3361    //     config.replyto = q.clone();
3362    //     let envelope = config.to_envelope();
3363
3364    //     let result = self.send(envelope, None).await;
3365    //     let rpc_result = match result {
3366    //         Ok(m) => {
3367    //             let data = match m.data {
3368    //                 Some(d) => d,
3369    //                 None => {
3370    //                     return Err(OpenIAPError::ClientError("No data in response".to_string()))
3371    //                 }
3372    //             };
3373    //             if m.command == "error" {
3374    //                 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3375    //                     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3376    //                 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3377    //             }
3378
3379    //             match tokio::time::timeout(timeout, rx).await {
3380    //                 Ok(Ok(val)) => Ok(val),
3381    //                 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3382    //                 Err(_) => Err(OpenIAPError::ClientError("RPC request timed out".to_string())),
3383    //             }
3384    //         }
3385    //         Err(e) => {
3386    //             let unregister_err = self.unregister_queue(&q).await.err();
3387    //             if unregister_err.is_some() {
3388    //                 error!("Failed to unregister Response Queue: {:?}", unregister_err);
3389    //             }
3390    //             Err(OpenIAPError::ClientError(e.to_string()))
3391    //         }
3392    //     };
3393
3394    //     if let Err(e) = self.unregister_queue(&q).await {
3395    //         error!("Failed to unregister Response Queue: {:?}", e);
3396    //     } else {
3397    //         debug!("Unregistered Response Queue: {:?}", q);
3398    //     }
3399    //     match rpc_result {
3400    //         Ok(val) => Ok(val),
3401    //         Err(e) => Err(e),
3402    //     }
3403    // }
3404    /// Push a new workitem to a workitem queue
3405    /// If the file is less than 5 megabytes it will be attached to the workitem
3406    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3407    #[tracing::instrument(skip_all)]
3408    pub async fn push_workitem(
3409        &self,
3410        mut config: PushWorkitemRequest,
3411        env: EnvConfig,
3412    ) -> Result<PushWorkitemResponse, OpenIAPError> {
3413        if config.wiq.is_empty() && config.wiqid.is_empty() {
3414            return Err(OpenIAPError::ClientError(
3415                "No queue name or id provided".to_string(),
3416            ));
3417        }
3418        for f in &mut config.files {
3419            if f.filename.is_empty() && f.file.is_empty() {
3420                debug!("Filename is empty");
3421            } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3422                // does file exist?
3423                if !std::path::Path::new(&f.filename).exists() {
3424                    debug!("File does not exist: {}", f.filename);
3425                } else {
3426                    let filesize = std::fs::metadata(&f.filename).unwrap().len();
3427                    // if filesize is less than 5 meggabytes attach it, else upload
3428                    if filesize < 5 * 1024 * 1024 {
3429                        debug!("File {} exists so ATTACHING it.", f.filename);
3430                        let filename = std::path::Path::new(&f.filename)
3431                            .file_name()
3432                            .unwrap()
3433                            .to_str()
3434                            .unwrap();
3435                        f.file = std::fs::read(&f.filename).unwrap();
3436                        // f.file = compress_file(&f.filename).unwrap();
3437                        // f.compressed = false;
3438                        f.file = util::compress_file_to_vec(&f.filename).unwrap();
3439                        f.compressed = true;
3440                        f.filename = filename.to_string();
3441                        f.id = "findme".to_string();
3442                        trace!(
3443                            "File {} was read and assigned to f.file, size: {}",
3444                            f.filename,
3445                            f.file.len()
3446                        );
3447                    } else {
3448                        debug!("File {} exists so UPLOADING it.", f.filename);
3449                        let filename = std::path::Path::new(&f.filename)
3450                            .file_name()
3451                            .unwrap()
3452                            .to_str()
3453                            .unwrap();
3454                        let uploadconfig = UploadRequest {
3455                            filename: filename.to_string(),
3456                            collectionname: "fs.files".to_string(),
3457                            ..Default::default()
3458                        };
3459                        let uploadresult = self.upload(uploadconfig, crate::EnvConfig::new(), &f.filename).await.unwrap();
3460                        trace!("File {} was upload as {}", filename, uploadresult.id);
3461                        // f.filename = "".to_string();
3462                        f.id = uploadresult.id.clone();
3463                        f.filename = filename.to_string();
3464                    }
3465                }
3466            } else {
3467                debug!("File {} is already uploaded", f.filename);
3468            }
3469        }
3470        let mut envelope = config.to_envelope();
3471        if !env.jwt.is_empty() {
3472            envelope.jwt = env.jwt;
3473        }
3474        if !env.spanid.is_empty() {
3475            envelope.spanid = env.spanid;
3476        }
3477        if !env.traceid.is_empty() {
3478            envelope.traceid = env.traceid;
3479        }
3480        let result = self.send(envelope, None).await;
3481        match result {
3482            Ok(m) => {
3483                let data = match m.data {
3484                    Some(data) => data,
3485                    None => {
3486                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3487                    }
3488                };
3489                if m.command == "error" {
3490                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3491                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3492                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3493                }
3494                let response: PushWorkitemResponse = prost::Message::decode(data.value.as_ref())
3495                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3496                Ok(response)
3497            }
3498            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3499        }
3500    }
3501    /// Push multiple workitems to a workitem queue
3502    /// If the file is less than 5 megabytes it will be attached to the workitem
3503    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3504    #[tracing::instrument(skip_all)]
3505    pub async fn push_workitems(
3506        &self,
3507        mut config: PushWorkitemsRequest,
3508        env: EnvConfig,
3509    ) -> Result<PushWorkitemsResponse, OpenIAPError> {
3510        if config.wiq.is_empty() && config.wiqid.is_empty() {
3511            return Err(OpenIAPError::ClientError(
3512                "No queue name or id provided".to_string(),
3513            ));
3514        }
3515        for wi in &mut config.items {
3516            for f in &mut wi.files {
3517                if f.filename.is_empty() && f.file.is_empty() {
3518                    debug!("Filename is empty");
3519                } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3520                    // does file exist?
3521                    if !std::path::Path::new(&f.filename).exists() {
3522                        debug!("File does not exist: {}", f.filename);
3523                    } else {
3524                        let filesize = std::fs::metadata(&f.filename).unwrap().len();
3525                        // if filesize is less than 5 meggabytes attach it, else upload
3526                        if filesize < 5 * 1024 * 1024 {
3527                            debug!("File {} exists so ATTACHING it.", f.filename);
3528                            let filename = std::path::Path::new(&f.filename)
3529                                .file_name()
3530                                .unwrap()
3531                                .to_str()
3532                                .unwrap();
3533                            f.file = std::fs::read(&f.filename).unwrap();
3534                            // f.file = compress_file(&f.filename).unwrap();
3535                            // f.compressed = false;
3536                            f.file = util::compress_file_to_vec(&f.filename).unwrap();
3537                            f.compressed = true;
3538                            f.filename = filename.to_string();
3539                            f.id = "findme".to_string();
3540                            trace!(
3541                                "File {} was read and assigned to f.file, size: {}",
3542                                f.filename,
3543                                f.file.len()
3544                            );
3545                        } else {
3546                            debug!("File {} exists so UPLOADING it.", f.filename);
3547                            let filename = std::path::Path::new(&f.filename)
3548                                .file_name()
3549                                .unwrap()
3550                                .to_str()
3551                                .unwrap();
3552                            let uploadconfig = UploadRequest {
3553                                filename: filename.to_string(),
3554                                collectionname: "fs.files".to_string(),
3555                                ..Default::default()
3556                            };
3557                            let uploadresult =
3558                                self.upload(uploadconfig, crate::EnvConfig::new(), &f.filename).await.unwrap();
3559                            trace!("File {} was upload as {}", filename, uploadresult.id);
3560                            // f.filename = "".to_string();
3561                            f.id = uploadresult.id.clone();
3562                            f.filename = filename.to_string();
3563                        }
3564                    }
3565                } else {
3566                    debug!("File {} is already uploaded", f.filename);
3567                }
3568            }
3569        }
3570        let mut envelope = config.to_envelope();
3571        if !env.jwt.is_empty() {
3572            envelope.jwt = env.jwt;
3573        }
3574        if !env.spanid.is_empty() {
3575            envelope.spanid = env.spanid;
3576        }
3577        if !env.traceid.is_empty() {
3578            envelope.traceid = env.traceid;
3579        }
3580        let result = self.send(envelope, None).await;
3581        match result {
3582            Ok(m) => {
3583                let data = match m.data {
3584                    Some(data) => data,
3585                    None => {
3586                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3587                    }
3588                };
3589                if m.command == "error" {
3590                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3591                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3592                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3593                }
3594                let response: PushWorkitemsResponse =
3595                    prost::Message::decode(data.value.as_ref())
3596                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3597                Ok(response)
3598            }
3599            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3600        }
3601    }
3602    /// Pop a workitem from a workitem queue, return None if no workitem is available
3603    /// Any files attached to the workitem will be downloaded to the downloadfolder ( default "." )
3604    #[tracing::instrument(skip_all)]
3605    pub async fn pop_workitem(
3606        &self,
3607        config: PopWorkitemRequest,
3608        env: EnvConfig,
3609        downloadfolder: Option<&str>,
3610    ) -> Result<PopWorkitemResponse, OpenIAPError> {
3611        if config.wiq.is_empty() && config.wiqid.is_empty() {
3612            return Err(OpenIAPError::ClientError(
3613                "No queue name or id provided".to_string(),
3614            ));
3615        }
3616        let mut envelope = config.to_envelope();
3617        if !env.jwt.is_empty() {
3618            envelope.jwt = env.jwt;
3619        }
3620        if !env.spanid.is_empty() {
3621            envelope.spanid = env.spanid;
3622        }
3623        if !env.traceid.is_empty() {
3624            envelope.traceid = env.traceid;
3625        }
3626        let result = self.send(envelope, None).await;
3627        match result {
3628            Ok(m) => {
3629                let data = match m.data {
3630                    Some(data) => data,
3631                    None => {
3632                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
3633                    }
3634                };
3635                if m.command == "error" {
3636                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3637                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3638                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3639                }
3640                let response: PopWorkitemResponse = prost::Message::decode(data.value.as_ref())
3641                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3642
3643                match &response.workitem {
3644                    Some(wi) => {
3645                        for f in &wi.files {
3646                            if !f.id.is_empty() {
3647                                let downloadconfig = DownloadRequest {
3648                                    id: f.id.clone(),
3649                                    collectionname: "fs.files".to_string(),
3650                                    ..Default::default()
3651                                };
3652                                let downloadresult =
3653                                    match self.download(downloadconfig,
3654                                        crate::EnvConfig::new(),
3655                                        downloadfolder, None).await
3656                                    {
3657                                        Ok(r) => r,
3658                                        Err(e) => {
3659                                            debug!("Failed to download file: {}", e);
3660                                            continue;
3661                                        }
3662                                    };
3663                                debug!(
3664                                    "File {} was downloaded as {}",
3665                                    f.filename, downloadresult.filename
3666                                );
3667                            }
3668                        }
3669                    }
3670                    None => {
3671                        debug!("No workitem found");
3672                    }
3673                }
3674                Ok(response)
3675            }
3676            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3677        }
3678    }
3679    /// Update a workitem in a workitem queue
3680    /// If the file is less than 5 megabytes it will be attached to the workitem
3681    /// If the file is larger than 5 megabytes it will be uploaded to the database and attached to the workitem
3682    /// If a fileid is provided it will be used to update the file
3683    /// if a filename is provided without the id, it will be deleted
3684    #[tracing::instrument(skip_all)]
3685    pub async fn update_workitem(
3686        &self,
3687        mut config: UpdateWorkitemRequest,
3688        env: EnvConfig,
3689    ) -> Result<UpdateWorkitemResponse, OpenIAPError> {
3690        match &config.workitem {
3691            Some(wiq) => {
3692                if wiq.id.is_empty() {
3693                    return Err(OpenIAPError::ClientError(
3694                        "No workitem id provided".to_string(),
3695                    ));
3696                }
3697            }
3698            None => {
3699                return Err(OpenIAPError::ClientError(
3700                    "No workitem provided".to_string(),
3701                ));
3702            }
3703        }
3704        for f in &mut config.files {
3705            if f.filename.is_empty() && f.file.is_empty() {
3706                debug!("Filename is empty");
3707            } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3708                if !std::path::Path::new(&f.filename).exists() {
3709                    debug!("File does not exist: {}", f.filename);
3710                } else {
3711                    debug!("File {} exists so uploading it.", f.filename);
3712                    let filename = std::path::Path::new(&f.filename)
3713                        .file_name()
3714                        .unwrap()
3715                        .to_str()
3716                        .unwrap();
3717                    let uploadconfig = UploadRequest {
3718                        filename: filename.to_string(),
3719                        collectionname: "fs.files".to_string(),
3720                        ..Default::default()
3721                    };
3722                    let uploadresult = self.upload(uploadconfig, crate::EnvConfig::new(), &f.filename).await.unwrap();
3723                    trace!("File {} was upload as {}", filename, uploadresult.id);
3724                    f.id = uploadresult.id.clone();
3725                    f.filename = filename.to_string();
3726                }
3727            } else {
3728                debug!("Skipped file");
3729            }
3730        }
3731        let mut envelope = config.to_envelope();
3732        if !env.jwt.is_empty() {
3733            envelope.jwt = env.jwt;
3734        }
3735        if !env.spanid.is_empty() {
3736            envelope.spanid = env.spanid;
3737        }
3738        if !env.traceid.is_empty() {
3739            envelope.traceid = env.traceid;
3740        }
3741        let result = self.send(envelope, None).await;
3742        match result {
3743            Ok(m) => {
3744                let data = match m.data {
3745                    Some(d) => d,
3746                    None => {
3747                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3748                    }
3749                };
3750                if m.command == "error" {
3751                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3752                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3753                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3754                }
3755                let response: UpdateWorkitemResponse = prost::Message::decode(data.value.as_ref())
3756                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3757                Ok(response)
3758            }
3759            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3760        }
3761    }
3762    /// Delete a workitem from a workitem queue
3763    #[tracing::instrument(skip_all)]
3764    pub async fn delete_workitem(
3765        &self,
3766        config: DeleteWorkitemRequest,
3767        env: EnvConfig,
3768    ) -> Result<DeleteWorkitemResponse, OpenIAPError> {
3769        if config.id.is_empty() {
3770            return Err(OpenIAPError::ClientError(
3771                "No workitem id provided".to_string(),
3772            ));
3773        }
3774        let mut envelope = config.to_envelope();
3775        if !env.jwt.is_empty() {
3776            envelope.jwt = env.jwt;
3777        }
3778        if !env.spanid.is_empty() {
3779            envelope.spanid = env.spanid;
3780        }
3781        if !env.traceid.is_empty() {
3782            envelope.traceid = env.traceid;
3783        }
3784        let result = self.send(envelope, None).await;
3785        match result {
3786            Ok(m) => {
3787                let data = match m.data {
3788                    Some(d) => d,
3789                    None => {
3790                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3791                    }
3792                };
3793                if m.command == "error" {
3794                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3795                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3796                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3797                }
3798                let response: DeleteWorkitemResponse = prost::Message::decode(data.value.as_ref())
3799                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3800                Ok(response)
3801            }
3802            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3803        }
3804    }
3805    /// Add a workitem queue to openiap instance
3806    #[tracing::instrument(skip_all)]
3807    pub async fn add_workitem_queue(
3808        &self,
3809        config: AddWorkItemQueueRequest,
3810        env: EnvConfig,
3811    ) -> Result<WorkItemQueue, OpenIAPError> {
3812        if config.workitemqueue.is_none() {
3813            return Err(OpenIAPError::ClientError(
3814                "No workitem queue name provided".to_string(),
3815            ));
3816        }
3817        let mut envelope = config.to_envelope();
3818        if !env.jwt.is_empty() {
3819            envelope.jwt = env.jwt;
3820        }
3821        if !env.spanid.is_empty() {
3822            envelope.spanid = env.spanid;
3823        }
3824        if !env.traceid.is_empty() {
3825            envelope.traceid = env.traceid;
3826        }
3827        let result = self.send(envelope, None).await;
3828        match result {
3829            Ok(m) => {
3830                let data = match m.data {
3831                    Some(d) => d,
3832                    None => {
3833                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3834                    }
3835                };
3836                if m.command == "error" {
3837                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3838                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3839                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3840                }
3841                let response: AddWorkItemQueueResponse =
3842                    prost::Message::decode(data.value.as_ref())
3843                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3844                match response.workitemqueue {
3845                    Some(wiq) => Ok(wiq),
3846                    None => {
3847                        return Err(OpenIAPError::ClientError(
3848                            "No workitem queue returned".to_string(),
3849                        ));
3850                    }
3851                }
3852            }
3853            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3854        }
3855    }
3856    /// Update a workitem queue in openiap instance
3857    #[tracing::instrument(skip_all)]
3858    pub async fn update_workitem_queue(
3859        &self,
3860        config: UpdateWorkItemQueueRequest,
3861        env: EnvConfig,
3862    ) -> Result<WorkItemQueue, OpenIAPError> {
3863        if config.workitemqueue.is_none() {
3864            return Err(OpenIAPError::ClientError(
3865                "No workitem queue name provided".to_string(),
3866            ));
3867        }
3868        let mut envelope = config.to_envelope();
3869        if !env.jwt.is_empty() {
3870            envelope.jwt = env.jwt;
3871        }
3872        if !env.spanid.is_empty() {
3873            envelope.spanid = env.spanid;
3874        }
3875        if !env.traceid.is_empty() {
3876            envelope.traceid = env.traceid;
3877        }
3878        let result = self.send(envelope, None).await;
3879        match result {
3880            Ok(m) => {
3881                let data = match m.data {
3882                    Some(d) => d,
3883                    None => {
3884                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3885                    }
3886                };
3887                if m.command == "error" {
3888                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3889                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3890                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3891                }
3892                let response: UpdateWorkItemQueueResponse =
3893                    prost::Message::decode(data.value.as_ref())
3894                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3895                match response.workitemqueue {
3896                    Some(wiq) => Ok(wiq),
3897                    None => {
3898                        return Err(OpenIAPError::ClientError(
3899                            "No workitem queue returned".to_string(),
3900                        ));
3901                    }
3902                }
3903            }
3904            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3905        }
3906    }
3907    /// Delete a workitem queue from openiap instance
3908    #[tracing::instrument(skip_all)]
3909    pub async fn delete_workitem_queue(
3910        &self,
3911        config: DeleteWorkItemQueueRequest,
3912        env: EnvConfig,
3913    ) -> Result<(), OpenIAPError> {
3914        if config.wiq.is_empty() && config.wiqid.is_empty() {
3915            return Err(OpenIAPError::ClientError(
3916                "No workitem queue name or id provided".to_string(),
3917            ));
3918        }
3919        let mut envelope = config.to_envelope();
3920        if !env.jwt.is_empty() {
3921            envelope.jwt = env.jwt;
3922        }
3923        if !env.spanid.is_empty() {
3924            envelope.spanid = env.spanid;
3925        }
3926        if !env.traceid.is_empty() {
3927            envelope.traceid = env.traceid;
3928        }
3929        let result = self.send(envelope, None).await;
3930        match result {
3931            Ok(m) => {
3932                let data = match m.data {
3933                    Some(d) => d,
3934                    None => {
3935                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3936                    }
3937                };
3938                if m.command == "error" {
3939                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3940                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3941                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3942                }
3943                Ok(())
3944            }
3945            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3946        }
3947    }
3948    /// Run custom command on server. Custom commands are commands who is "on trail", they may change and are not ready to be moved to the fixed protobuf format yet
3949    #[tracing::instrument(skip_all)]
3950    pub async fn custom_command(
3951        &self,
3952        config: CustomCommandRequest,
3953        env: EnvConfig,
3954        timeout: Option<tokio::time::Duration>,
3955    ) -> Result<String, OpenIAPError> {
3956        if config.command.is_empty() {
3957            return Err(OpenIAPError::ClientError("No command provided".to_string()));
3958        }
3959        let mut envelope = config.to_envelope();
3960        if !env.jwt.is_empty() {
3961            envelope.jwt = env.jwt;
3962        }
3963        if !env.spanid.is_empty() {
3964            envelope.spanid = env.spanid;
3965        }
3966        if !env.traceid.is_empty() {
3967            envelope.traceid = env.traceid;
3968        }
3969        let result = self.send(envelope, timeout).await;
3970        match result {
3971            Ok(m) => {
3972                let data = match m.data {
3973                    Some(d) => d,
3974                    None => {
3975                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
3976                    }
3977                };
3978                if m.command == "error" {
3979                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3980                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3981                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3982                }
3983                let response: CustomCommandResponse =
3984                    prost::Message::decode(data.value.as_ref())
3985                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3986                Ok(response.result)
3987            }
3988            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3989        }
3990    }
3991    /// Delete a package from the database, cleaning up all all files and data
3992    #[tracing::instrument(skip_all)]
3993    pub async fn delete_package(&self, env: EnvConfig, packageid: &str) -> Result<(), OpenIAPError> {
3994        let config = DeletePackageRequest::byid(packageid);
3995        let mut envelope = config.to_envelope();
3996        if !env.jwt.is_empty() {
3997            envelope.jwt = env.jwt;
3998        }
3999        if !env.spanid.is_empty() {
4000            envelope.spanid = env.spanid;
4001        }
4002        if !env.traceid.is_empty() {
4003            envelope.traceid = env.traceid;
4004        }
4005        let result = self.send(envelope, None).await;
4006        match result {
4007            Ok(m) => {
4008                let data = match m.data {
4009                    Some(data) => data,
4010                    None => {
4011                        return Err(OpenIAPError::ClientError("No data returned".to_string()));
4012                    }
4013                };
4014                if m.command == "error" {
4015                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4016                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4017                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4018                }
4019                // prost::Message::decode(data.value.as_ref())
4020                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4021                Ok(())
4022            }
4023            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4024        }
4025    }
4026    /// Start Agent
4027    #[tracing::instrument(skip_all)]
4028    pub async fn start_agent(&self, env: EnvConfig, agentid: &str) -> Result<(), OpenIAPError> {
4029        let config = StartAgentRequest::byid(agentid);
4030        let mut envelope = config.to_envelope();
4031        if !env.jwt.is_empty() {
4032            envelope.jwt = env.jwt;
4033        }
4034        if !env.spanid.is_empty() {
4035            envelope.spanid = env.spanid;
4036        }
4037        if !env.traceid.is_empty() {
4038            envelope.traceid = env.traceid;
4039        }
4040        let result = self.send(envelope, None).await;
4041        match result {
4042            Ok(m) => {
4043                let data = match m.data {
4044                    Some(d) => d,
4045                    None => {
4046                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
4047                    }
4048                };
4049                if m.command == "error" {
4050                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4051                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4052                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4053                }
4054                // prost::Message::decode(data.value.as_ref())
4055                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4056                Ok(())
4057            }
4058            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4059        }
4060    }
4061    /// Stop an agent, this will cleanup all resources and stop the agent
4062    #[tracing::instrument(skip_all)]
4063    pub async fn stop_agent(&self, env: EnvConfig, agentid: &str) -> Result<(), OpenIAPError> {
4064        let config = StopAgentRequest::byid(agentid);
4065        let mut envelope = config.to_envelope();
4066        if !env.jwt.is_empty() {
4067            envelope.jwt = env.jwt;
4068        }
4069        if !env.spanid.is_empty() {
4070            envelope.spanid = env.spanid;
4071        }
4072        if !env.traceid.is_empty() {
4073            envelope.traceid = env.traceid;
4074        }
4075        let result = self.send(envelope, None).await;
4076        match result {
4077            Ok(m) => {
4078                let data = match m.data {
4079                    Some(d) => d,
4080                    None => {
4081                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
4082                    }
4083                };
4084                if m.command == "error" {
4085                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4086                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4087                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4088                }
4089                // prost::Message::decode(data.value.as_ref())
4090                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4091                Ok(())
4092            }
4093            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4094        }
4095    }
4096    /// Delete a pod from an agent, on kubernetes this will remove the pod and kubernetes will re-create it, on docker this will remove the pod. Then use start_agent to start the agent again
4097    #[tracing::instrument(skip_all)]
4098    pub async fn delete_agent_pod(&self, env: EnvConfig, agentid: &str, podname: &str) -> Result<(), OpenIAPError> {
4099        let config = DeleteAgentPodRequest::byid(agentid, podname);
4100        let mut envelope = config.to_envelope();
4101        if !env.jwt.is_empty() {
4102            envelope.jwt = env.jwt;
4103        }
4104        if !env.spanid.is_empty() {
4105            envelope.spanid = env.spanid;
4106        }
4107        if !env.traceid.is_empty() {
4108            envelope.traceid = env.traceid;
4109        }
4110        let result = self.send(envelope, None).await;
4111        match result {
4112            Ok(m) => {
4113                let data = match m.data {
4114                    Some(d) => d,
4115                    None => {
4116                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
4117                    }
4118                };
4119                if m.command == "error" {
4120                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4121                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4122                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4123                }
4124                // prost::Message::decode(data.value.as_ref())
4125                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4126                Ok(())
4127            }
4128            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4129        }
4130    }
4131    /// Delete an agent, this will cleanup all resources and delete the agent
4132    #[tracing::instrument(skip_all)]
4133    pub async fn delete_agent(&self, env: EnvConfig, agentid: &str) -> Result<(), OpenIAPError> {
4134        let config = DeleteAgentRequest::byid(agentid);
4135        let mut envelope = config.to_envelope();
4136        if !env.jwt.is_empty() {
4137            envelope.jwt = env.jwt;
4138        }
4139        if !env.spanid.is_empty() {
4140            envelope.spanid = env.spanid;
4141        }
4142        if !env.traceid.is_empty() {
4143            envelope.traceid = env.traceid;
4144        }
4145        let result = self.send(envelope, None).await;
4146        match result {
4147            Ok(m) => {
4148                let data = match m.data {
4149                    Some(d) => d,
4150                    None => {
4151                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
4152                    }
4153                };
4154                if m.command == "error" {
4155                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4156                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4157                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4158                }
4159                // prost::Message::decode(data.value.as_ref())
4160                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4161                Ok(())
4162            }
4163            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4164        }
4165    }
4166    /// Get all pods associated with an agent, if stats is true, it will return memory and cpu usage for each pod
4167    #[tracing::instrument(skip_all)]
4168    pub async fn get_agent_pods(&self, env: EnvConfig, agentid: &str, stats: bool) -> Result<String, OpenIAPError> {
4169        let config = GetAgentPodsRequest::byid(agentid, stats);
4170        let mut envelope = config.to_envelope();
4171        if !env.jwt.is_empty() {
4172            envelope.jwt = env.jwt;
4173        }
4174        if !env.spanid.is_empty() {
4175            envelope.spanid = env.spanid;
4176        }
4177        if !env.traceid.is_empty() {
4178            envelope.traceid = env.traceid;
4179        }
4180        let result = self.send(envelope, None).await;
4181        match result {
4182            Ok(m) => {
4183                let data = match m.data {
4184                    Some(d) => d,
4185                    None => {
4186                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
4187                    }
4188                };
4189                if m.command == "error" {
4190                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4191                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4192                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4193                }
4194                let response: GetAgentPodsResponse = prost::Message::decode(data.value.as_ref())
4195                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4196                Ok(response.results)
4197            }
4198            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4199        }
4200    }
4201    /// Get logs from a pod associated with an agent, leave podname empty to get logs from all pods
4202    #[tracing::instrument(skip_all)]
4203    pub async fn get_agent_pod_logs(
4204        &self,
4205        env: EnvConfig,
4206        agentid: &str,
4207        podname: &str,
4208    ) -> Result<String, OpenIAPError> {
4209        let config = GetAgentLogRequest::new(agentid, podname);
4210        let mut envelope = config.to_envelope();
4211        if !env.jwt.is_empty() {
4212            envelope.jwt = env.jwt;
4213        }
4214        if !env.spanid.is_empty() {
4215            envelope.spanid = env.spanid;
4216        }
4217        if !env.traceid.is_empty() {
4218            envelope.traceid = env.traceid;
4219        }
4220        let result = self.send(envelope, None).await;
4221        match result {
4222            Ok(m) => {
4223                let data = match m.data {
4224                    Some(d) => d,
4225                    None => {
4226                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
4227                    }
4228                };
4229                if m.command == "error" {
4230                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4231                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4232                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4233                }
4234                let response: GetAgentLogResponse = prost::Message::decode(data.value.as_ref())
4235                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4236                Ok(response.result)
4237            }
4238            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4239        }
4240    }
4241
4242    /// Create/update a customer in the OpenIAP service. If stripe has been configured, it will create or update a customer in stripe as well
4243    /// A customer is a customer object that can only be updated using this function, and 2 roles ( customername admins and customername users )
4244    #[tracing::instrument(skip_all)]
4245    pub async fn ensure_customer(
4246        &self,
4247        config: EnsureCustomerRequest,
4248        env: EnvConfig,
4249    ) -> Result<EnsureCustomerResponse, OpenIAPError> {
4250        if config.customer.is_none() && config.stripe.is_none() {
4251            return Err(OpenIAPError::ClientError(
4252                "No customer or stripe provided".to_string(),
4253            ));
4254        }
4255        let mut envelope = config.to_envelope();
4256        if !env.jwt.is_empty() {
4257            envelope.jwt = env.jwt;
4258        }
4259        if !env.spanid.is_empty() {
4260            envelope.spanid = env.spanid;
4261        }
4262        if !env.traceid.is_empty() {
4263            envelope.traceid = env.traceid;
4264        }
4265        let result = self.send(envelope, None).await;
4266        match result {
4267            Ok(m) => {
4268                let data = match m.data {
4269                    Some(d) => d,
4270                    None => {
4271                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
4272                    }
4273                };
4274                if m.command == "error" {
4275                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4276                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4277                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4278                }
4279                let response: EnsureCustomerResponse = prost::Message::decode(data.value.as_ref())
4280                    .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4281                Ok(response)
4282            }
4283            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4284        }
4285    }
4286    /// Create a new workflow instance, to be used to workflow in/out nodes in NodeRED
4287    #[tracing::instrument(skip_all)]
4288    pub async fn create_workflow_instance(
4289        &self,
4290        config: CreateWorkflowInstanceRequest,
4291        env: EnvConfig,
4292    ) -> Result<String, OpenIAPError> {
4293        if config.workflowid.is_empty() {
4294            return Err(OpenIAPError::ClientError(
4295                "No workflow id provided".to_string(),
4296            ));
4297        }
4298        let mut envelope = config.to_envelope();
4299        if !env.jwt.is_empty() {
4300            envelope.jwt = env.jwt;
4301        }
4302        if !env.spanid.is_empty() {
4303            envelope.spanid = env.spanid;
4304        }
4305        if !env.traceid.is_empty() {
4306            envelope.traceid = env.traceid;
4307        }
4308        let result = self.send(envelope, None).await;
4309        match result {
4310            Ok(m) => {
4311                let data = match m.data {
4312                    Some(d) => d,
4313                    None => {
4314                        return Err(OpenIAPError::ClientError("No data in response".to_string()));
4315                    }
4316                };
4317                if m.command == "error" {
4318                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4319                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4320                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4321                }
4322                let response: CreateWorkflowInstanceResponse =
4323                    prost::Message::decode(data.value.as_ref())
4324                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4325                Ok(response.instanceid)
4326            }
4327            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4328        }
4329    }
4330
4331    /// Invoke a workflow in the OpenRPA robot where robotid is the userid of the user the robot is running as, or a roleid with RPA enabled
4332    #[tracing::instrument(skip_all)]
4333    pub async fn invoke_openrpa(
4334        &self,
4335        config: InvokeOpenRpaRequest,
4336        env: EnvConfig,
4337        timeout: Option<tokio::time::Duration>,
4338    ) -> Result<String, OpenIAPError> {
4339        if config.robotid.is_empty() {
4340            return Err(OpenIAPError::ClientError(
4341                "No robot id provided".to_string(),
4342            ));
4343        }
4344        if config.workflowid.is_empty() {
4345            return Err(OpenIAPError::ClientError(
4346                "No workflow id provided".to_string(),
4347            ));
4348        }
4349
4350        let (tx, rx) = oneshot::channel::<String>();
4351        let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
4352
4353        let q = self
4354            .register_queue(
4355                RegisterQueueRequest {
4356                    queuename: "".to_string(),
4357                },
4358                crate::EnvConfig::new(),
4359                Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
4360                    let tx = tx.clone();
4361                    Box::pin(async move {
4362                        let json = event.data.clone();
4363                        let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
4364                        let command: String = obj["command"].as_str().unwrap().to_string();
4365                        debug!("Received event: {:?}", event);
4366                        if command.eq("invokesuccess") {
4367                            debug!("Robot successfully started running workflow");
4368                        } else if command.eq("invokeidle") {
4369                            debug!("Workflow went idle");
4370                        } else if command.eq("invokeerror") {
4371                            debug!("Robot failed to run workflow");
4372                            let tx = tx.lock().unwrap().take().unwrap();
4373                            tx.send(event.data).unwrap();
4374                        } else if command.eq("timeout") {
4375                            debug!("No robot picked up the workflow");
4376                            let tx = tx.lock().unwrap().take().unwrap();
4377                            tx.send(event.data).unwrap();
4378                        } else if command.eq("invokecompleted") {
4379                            debug!("Robot completed running workflow");
4380                            let tx = tx.lock().unwrap().take().unwrap();
4381                            tx.send(event.data).unwrap();
4382                        } else {
4383                            let tx = tx.lock().unwrap().take().unwrap();
4384                            tx.send(event.data).unwrap();
4385                        }
4386                        None
4387                    })
4388                }),
4389            )
4390            .await
4391            .unwrap();
4392        debug!("Registered Response Queue: {:?}", q);
4393        let data = format!(
4394            "{{\"command\":\"invoke\",\"workflowid\":\"{}\",\"data\": {}}}",
4395            config.workflowid, config.payload
4396        );
4397        debug!("Send Data: {}", data);
4398        debug!("To Queue: {} With reply to: {}", config.robotid, q);
4399        let config = QueueMessageRequest {
4400            queuename: config.robotid.clone(),
4401            replyto: q.clone(),
4402            data,
4403            ..Default::default()
4404        };
4405        let mut envelope = config.to_envelope();
4406        if !env.jwt.is_empty() {
4407            envelope.jwt = env.jwt;
4408        }
4409        if !env.spanid.is_empty() {
4410            envelope.spanid = env.spanid;
4411        }
4412        if !env.traceid.is_empty() {
4413            envelope.traceid = env.traceid;
4414        }
4415        let result = self.send(envelope, timeout).await;
4416        match result {
4417            Ok(m) => {
4418                let data = match m.data {
4419                    Some(d) => d,
4420                    None => {
4421                        return Err(OpenIAPError::ClientError("No data in response".to_string()))
4422                    }
4423                };
4424                if m.command == "error" {
4425                    let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4426                        .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4427                    return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4428                }
4429                // prost::Message::decode(data.value.as_ref())
4430                //     .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4431
4432                let duration = timeout.unwrap_or_else(|| self.get_default_timeout());
4433                // let json = rx.await.unwrap();
4434                let json = match tokio::time::timeout(duration, rx).await {
4435                    Ok(Ok(val)) => {
4436                        // Success, unregister queue before returning
4437                        let _ = self.unregister_queue(crate::EnvConfig::new(), &q).await;
4438                        val
4439                    },
4440                    Ok(Err(e)) => {
4441                        let _ = self.unregister_queue(crate::EnvConfig::new(), &q).await;
4442                        return Err(OpenIAPError::CustomError(e.to_string()));
4443                    },
4444                    Err(_) => {
4445                        let _ = self.unregister_queue(crate::EnvConfig::new(), &q).await;
4446                        return Err(OpenIAPError::ServerError("Timeout".to_string()));
4447                    },
4448                };
4449                debug!("Received json result: {:?}", json);
4450                let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
4451                let command: String = obj["command"].as_str().unwrap().to_string();
4452                let mut data = "".to_string();
4453                if obj["data"].as_str().is_some() {
4454                    data = obj["data"].as_str().unwrap().to_string();
4455                } else if obj["data"].as_object().is_some() {
4456                    data = obj["data"].to_string();
4457                }
4458                if !command.eq("invokecompleted") {
4459                    if command.eq("timeout") {
4460                        return Err(OpenIAPError::ServerError("Timeout".to_string()));
4461                    } else {
4462                        if data.is_empty() {
4463                            return Err(OpenIAPError::ServerError(
4464                                "Error with no message".to_string(),
4465                            ));
4466                        }
4467                        return Err(OpenIAPError::ServerError(data));
4468                    }
4469                }
4470                // let response = self.unregister_queue(&q).await;
4471                // match response {
4472                //     Ok(_) => {
4473                //         debug!("Unregistered Response Queue: {:?}", q);
4474                //     }
4475                //     Err(e) => {
4476                //         error!("Failed to unregister Response Queue: {:?}", e);
4477                //     }
4478                // }
4479                Ok(data)
4480            }
4481            Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4482        }
4483    }
4484}
4485