1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#[cfg(feature = "curl")]
pub mod authenticator;
#[cfg(feature = "bucket")]
pub mod bucket;
#[cfg(feature = "bucket")]
pub mod bucket_select;
pub mod counter;
#[cfg(feature = "curl")]
pub mod curl;
pub mod in_memory;
pub mod io;
pub mod local;
#[cfg(feature = "mongodb")]
pub mod mongodb;
pub mod paginator;
#[cfg(feature = "psql")]
pub mod psql;

#[cfg(feature = "bucket")]
use self::bucket::Bucket;
#[cfg(feature = "bucket")]
use self::bucket_select::BucketSelect;
#[cfg(feature = "curl")]
use self::curl::Curl;
use self::in_memory::InMemory;
use self::io::Io;
use self::local::Local;
#[cfg(feature = "mongodb")]
use self::mongodb::Mongodb;
#[cfg(feature = "psql")]
use self::psql::Psql;
use crate::document::Document;
use crate::DataSet;
use crate::DataStream;
use crate::Metadata;
use async_trait::async_trait;
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt;
use std::io::{Error, ErrorKind, Result};
use std::pin::Pin;

#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(tag = "type")]
pub enum ConnectorType {
    #[serde(rename = "in_memory")]
    #[serde(alias = "mem")]
    InMemory(InMemory),
    #[serde(rename = "io")]
    Io(Io),
    #[serde(rename = "local")]
    Local(Local),
    #[cfg(feature = "bucket")]
    #[serde(rename = "bucket")]
    Bucket(Bucket),
    #[cfg(feature = "bucket")]
    #[serde(rename = "bucket_select")]
    BucketSelect(BucketSelect),
    #[cfg(feature = "curl")]
    #[serde(rename = "curl")]
    Curl(Curl),
    #[cfg(feature = "mongodb")]
    #[serde(rename = "mongodb")]
    #[serde(alias = "mongo")]
    Mongodb(Mongodb),
    #[cfg(feature = "psql")]
    #[serde(rename = "psql")]
    #[serde(alias = "pgsql")]
    #[serde(alias = "pg")]
    Psql(Psql),
}

impl Default for ConnectorType {
    fn default() -> Self {
        ConnectorType::Io(Io::default())
    }
}

impl ConnectorType {
    pub fn boxed_inner(self) -> Box<dyn Connector> {
        match self {
            ConnectorType::InMemory(connector) => Box::new(connector),
            ConnectorType::Io(connector) => Box::new(connector),
            ConnectorType::Local(connector) => Box::new(connector),
            #[cfg(feature = "curl")]
            ConnectorType::Curl(connector) => Box::new(connector),
            #[cfg(feature = "bucket")]
            ConnectorType::Bucket(connector) => Box::new(connector),
            #[cfg(feature = "bucket")]
            ConnectorType::BucketSelect(connector) => Box::new(connector),
            #[cfg(feature = "mongodb")]
            ConnectorType::Mongodb(connector) => Box::new(connector),
            #[cfg(feature = "psql")]
            ConnectorType::Psql(connector) => Box::new(connector),
        }
    }
}

impl ConnectorType {
    pub fn inner(&self) -> &dyn Connector {
        match self {
            ConnectorType::InMemory(connector) => connector,
            ConnectorType::Io(connector) => connector,
            ConnectorType::Local(connector) => connector,
            #[cfg(feature = "curl")]
            ConnectorType::Curl(connector) => connector,
            #[cfg(feature = "bucket")]
            ConnectorType::Bucket(connector) => connector,
            #[cfg(feature = "bucket")]
            ConnectorType::BucketSelect(connector) => connector,
            #[cfg(feature = "mongodb")]
            ConnectorType::Mongodb(connector) => connector,
            #[cfg(feature = "psql")]
            ConnectorType::Psql(connector) => connector,
        }
    }
}

/// Struct that implement this trait can get a reader or writer in order to do something on a document.
#[async_trait]
pub trait Connector: Send + Sync + std::fmt::Debug + ConnectorClone + Unpin {
    fn set_document(&mut self, _document: Box<dyn Document>) -> Result<()> {
        Ok(())
    }
    fn document(&self) -> Result<&Box<dyn Document>> {
        Err(Error::new(
            ErrorKind::Unsupported,
            "function not implemented",
        ))
    }
    fn is_resource_will_change(&self, new_parameters: Value) -> Result<bool>;
    /// Set parameters.
    fn set_parameters(&mut self, parameters: Value);
    /// Set the connector metadata that can change with the document metadata.
    fn set_metadata(&mut self, _metadata: Metadata) {}
    /// Get the connector metadata
    fn metadata(&self) -> Metadata {
        Metadata::default()
    }
    /// Test if the connector is variable and if the context change, the resource will change.
    fn is_variable(&self) -> bool;
    /// Check if the resource is empty.
    async fn is_empty(&self) -> Result<bool> {
        Ok(0 == self.len().await?)
    }
    /// Get the resource size of the current path.
    async fn len(&self) -> Result<usize> {
        Ok(0)
    }
    /// Path of the document
    fn path(&self) -> String;
    /// Fetch data from the resource and set the inner of the connector.
    async fn fetch(&mut self) -> std::io::Result<Option<DataStream>>;
    /// Send the data from the inner connector to the remote resource.
    async fn send(&mut self, dataset: &DataSet) -> std::io::Result<Option<DataStream>>;
    /// Erase the content of the resource.
    async fn erase(&mut self) -> Result<()> {
        Err(Error::new(
            ErrorKind::Unsupported,
            "function not implemented",
        ))
    }
    /// Paginate through the current connector and return a stream of new connector with new parameters.
    async fn paginate(
        &self,
    ) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn Connector>>> + Send>>>;
}

impl fmt::Display for dyn Connector {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.path())
    }
}
pub trait ConnectorClone {
    fn clone_box(&self) -> Box<dyn Connector>;
}

impl<T> ConnectorClone for T
where
    T: 'static + Connector + Clone,
{
    fn clone_box(&self) -> Box<dyn Connector> {
        Box::new(self.clone())
    }
}

impl Clone for Box<dyn Connector> {
    fn clone(&self) -> Box<dyn Connector> {
        self.clone_box()
    }
}