menmos/
lib.rs

1pub mod fs;
2mod metadata_detector;
3mod profile;
4pub mod push;
5mod typing;
6mod util;
7
8pub use menmos_client::{BuildError, Query};
9pub use profile::{Config, Profile};
10pub use typing::{FileMetadata, UploadRequest};
11
12use metadata_detector::{MetadataDetector, MetadataDetectorRC};
13use typing::*;
14
15use std::sync::Arc;
16use std::time;
17
18use async_stream::try_stream;
19
20use futures::{TryStream, TryStreamExt};
21use interface::Hit;
22
23pub use interface;
24
25use menmos_client::Client;
26
27use snafu::prelude::*;
28
29#[derive(Debug, Snafu)]
30pub enum MenmosError {
31    ConfigLoad {
32        source: error::ProfileError,
33    },
34
35    #[snafu(display("profile '{}' does not exist", profile))]
36    ProfileLoad {
37        profile: String,
38    },
39
40    #[snafu(display("failed to build client"))]
41    ClientBuild {
42        source: BuildError,
43    },
44
45    FilePush {
46        source: error::PushError,
47    },
48
49    DirectoryRead {
50        source: std::io::Error,
51    },
52
53    Query {
54        source: util::UtilError,
55    },
56}
57
58type Result<T> = std::result::Result<T, MenmosError>;
59
60mod error {
61    pub use super::MenmosError;
62    pub use crate::fs::FsError;
63    pub use crate::metadata_detector::MetadataDetectorError;
64    pub use crate::profile::ProfileError;
65    pub use crate::push::PushError;
66}
67
68fn load_profile_from_config(profile: &str) -> Result<Profile> {
69    let config = Config::load().context(ConfigLoadSnafu)?;
70    config
71        .profiles
72        .get(profile)
73        .cloned()
74        .context(ProfileLoadSnafu {
75            profile: String::from(profile),
76        })
77}
78
79/// The menmos client.
80#[derive(Clone)]
81pub struct Menmos {
82    /// The filesystem interface to menmos.
83    ///
84    /// This interface should be used when manipulating concepts that are similar to files and folders.
85    pub fs: fs::MenmosFs,
86
87    client: ClientRC,
88
89    metadata_detector: MetadataDetectorRC,
90}
91
92impl Menmos {
93    fn new_with_client(client: Client) -> Self {
94        let client_rc = Arc::new(client);
95        let fs = fs::MenmosFs::new(client_rc.clone());
96
97        // If this fails we shipped a bad library.
98        let metadata_detector = Arc::new(MetadataDetector::new().unwrap());
99
100        Self {
101            fs,
102            client: client_rc,
103            metadata_detector,
104        }
105    }
106
107    pub async fn new(profile: &str) -> Result<Self> {
108        let profile = load_profile_from_config(profile)?;
109        let client = Client::builder()
110            .with_host(profile.host)
111            .with_username(profile.username)
112            .with_password(profile.password)
113            .build()
114            .await
115            .context(ClientBuildSnafu)?;
116        Ok(Self::new_with_client(client))
117    }
118
119    /// Get a builder to configure the client.
120    pub fn builder(profile: &str) -> MenmosBuilder {
121        MenmosBuilder::new(profile.into())
122    }
123
124    /// Get a reference to the internal low-level menmos client.
125    pub fn client(&self) -> &Client {
126        self.client.as_ref()
127    }
128
129    /// Get a stream of results for a given query.
130    pub fn query(&self, query: Query) -> impl TryStream<Ok = Hit, Error = MenmosError> + Unpin {
131        util::scroll_query(query, &self.client).map_err(|e| MenmosError::Query { source: e })
132    }
133
134    /// Recursively push a sequence of files and/or directories to the menmos cluster.
135    pub fn push_files(
136        &self,
137        requests: Vec<UploadRequest>,
138    ) -> impl TryStream<Ok = push::PushResult, Error = MenmosError> + Unpin {
139        let client = self.client.clone();
140        let metadata_detector = self.metadata_detector.clone();
141
142        Box::pin(try_stream! {
143            let mut working_stack = Vec::new();
144            working_stack.extend(requests);
145
146            while let Some(upload_request) = working_stack.pop(){
147                if upload_request.path.is_file() {
148                    let source_path = upload_request.path.clone();
149                    let blob_id = push::push_file(client.clone(), &metadata_detector, upload_request).await.map_err(|e| MenmosError::FilePush{source: e})?;
150                    yield push::PushResult{source_path, blob_id};
151                } else {
152                    let directory_id: String = push::push_file(
153                        client.clone(),
154                        &metadata_detector,
155                        upload_request.clone()                    )
156                    .await.context(FilePushSnafu)?;
157
158                    // Add this directory's children to the working stack.
159                    let read_dir_result: Result<std::fs::ReadDir> = upload_request.path.read_dir().map_err(|e| MenmosError::DirectoryRead{source: e});
160                    for child in read_dir_result?.filter_map(|f| f.ok()) {
161                        let mut req_clone = upload_request.clone();
162                        req_clone.path = child.path().clone();
163                        req_clone.fields.insert("parent".to_string(), directory_id.clone());
164                        working_stack.push(req_clone);
165                    }
166                }
167            }
168        })
169    }
170}
171
172pub struct MenmosBuilder {
173    profile: String,
174    request_timeout: Option<time::Duration>,
175}
176
177impl MenmosBuilder {
178    pub(crate) fn new(profile: String) -> Self {
179        Self {
180            profile,
181            request_timeout: None,
182        }
183    }
184
185    #[must_use]
186    pub fn with_request_timeout(mut self, request_timeout: time::Duration) -> Self {
187        self.request_timeout = Some(request_timeout);
188        self
189    }
190
191    pub async fn build(self) -> Result<Menmos> {
192        let profile = load_profile_from_config(&self.profile)?;
193        let mut builder = Client::builder()
194            .with_host(profile.host)
195            .with_username(profile.username)
196            .with_password(profile.password);
197
198        if let Some(request_timeout) = self.request_timeout {
199            builder = builder.with_request_timeout(request_timeout);
200        }
201
202        let client = builder.build().await.context(ClientBuildSnafu)?;
203
204        Ok(Menmos::new_with_client(client))
205    }
206}