1pub mod fs;
2mod metadata_detector;
3mod profile;
4pub mod push;
5mod typing;
6mod util;
7
8pub use menmos_client::{BuildError, Query};
9pub use profile::{Config, Profile};
10pub use typing::{FileMetadata, UploadRequest};
11
12use metadata_detector::{MetadataDetector, MetadataDetectorRC};
13use typing::*;
14
15use std::sync::Arc;
16use std::time;
17
18use async_stream::try_stream;
19
20use futures::{TryStream, TryStreamExt};
21use interface::Hit;
22
23pub use interface;
24
25use menmos_client::Client;
26
27use snafu::prelude::*;
28
29#[derive(Debug, Snafu)]
30pub enum MenmosError {
31 ConfigLoad {
32 source: error::ProfileError,
33 },
34
35 #[snafu(display("profile '{}' does not exist", profile))]
36 ProfileLoad {
37 profile: String,
38 },
39
40 #[snafu(display("failed to build client"))]
41 ClientBuild {
42 source: BuildError,
43 },
44
45 FilePush {
46 source: error::PushError,
47 },
48
49 DirectoryRead {
50 source: std::io::Error,
51 },
52
53 Query {
54 source: util::UtilError,
55 },
56}
57
58type Result<T> = std::result::Result<T, MenmosError>;
59
60mod error {
61 pub use super::MenmosError;
62 pub use crate::fs::FsError;
63 pub use crate::metadata_detector::MetadataDetectorError;
64 pub use crate::profile::ProfileError;
65 pub use crate::push::PushError;
66}
67
68fn load_profile_from_config(profile: &str) -> Result<Profile> {
69 let config = Config::load().context(ConfigLoadSnafu)?;
70 config
71 .profiles
72 .get(profile)
73 .cloned()
74 .context(ProfileLoadSnafu {
75 profile: String::from(profile),
76 })
77}
78
79#[derive(Clone)]
81pub struct Menmos {
82 pub fs: fs::MenmosFs,
86
87 client: ClientRC,
88
89 metadata_detector: MetadataDetectorRC,
90}
91
92impl Menmos {
93 fn new_with_client(client: Client) -> Self {
94 let client_rc = Arc::new(client);
95 let fs = fs::MenmosFs::new(client_rc.clone());
96
97 let metadata_detector = Arc::new(MetadataDetector::new().unwrap());
99
100 Self {
101 fs,
102 client: client_rc,
103 metadata_detector,
104 }
105 }
106
107 pub async fn new(profile: &str) -> Result<Self> {
108 let profile = load_profile_from_config(profile)?;
109 let client = Client::builder()
110 .with_host(profile.host)
111 .with_username(profile.username)
112 .with_password(profile.password)
113 .build()
114 .await
115 .context(ClientBuildSnafu)?;
116 Ok(Self::new_with_client(client))
117 }
118
119 pub fn builder(profile: &str) -> MenmosBuilder {
121 MenmosBuilder::new(profile.into())
122 }
123
124 pub fn client(&self) -> &Client {
126 self.client.as_ref()
127 }
128
129 pub fn query(&self, query: Query) -> impl TryStream<Ok = Hit, Error = MenmosError> + Unpin {
131 util::scroll_query(query, &self.client).map_err(|e| MenmosError::Query { source: e })
132 }
133
134 pub fn push_files(
136 &self,
137 requests: Vec<UploadRequest>,
138 ) -> impl TryStream<Ok = push::PushResult, Error = MenmosError> + Unpin {
139 let client = self.client.clone();
140 let metadata_detector = self.metadata_detector.clone();
141
142 Box::pin(try_stream! {
143 let mut working_stack = Vec::new();
144 working_stack.extend(requests);
145
146 while let Some(upload_request) = working_stack.pop(){
147 if upload_request.path.is_file() {
148 let source_path = upload_request.path.clone();
149 let blob_id = push::push_file(client.clone(), &metadata_detector, upload_request).await.map_err(|e| MenmosError::FilePush{source: e})?;
150 yield push::PushResult{source_path, blob_id};
151 } else {
152 let directory_id: String = push::push_file(
153 client.clone(),
154 &metadata_detector,
155 upload_request.clone() )
156 .await.context(FilePushSnafu)?;
157
158 let read_dir_result: Result<std::fs::ReadDir> = upload_request.path.read_dir().map_err(|e| MenmosError::DirectoryRead{source: e});
160 for child in read_dir_result?.filter_map(|f| f.ok()) {
161 let mut req_clone = upload_request.clone();
162 req_clone.path = child.path().clone();
163 req_clone.fields.insert("parent".to_string(), directory_id.clone());
164 working_stack.push(req_clone);
165 }
166 }
167 }
168 })
169 }
170}
171
172pub struct MenmosBuilder {
173 profile: String,
174 request_timeout: Option<time::Duration>,
175}
176
177impl MenmosBuilder {
178 pub(crate) fn new(profile: String) -> Self {
179 Self {
180 profile,
181 request_timeout: None,
182 }
183 }
184
185 #[must_use]
186 pub fn with_request_timeout(mut self, request_timeout: time::Duration) -> Self {
187 self.request_timeout = Some(request_timeout);
188 self
189 }
190
191 pub async fn build(self) -> Result<Menmos> {
192 let profile = load_profile_from_config(&self.profile)?;
193 let mut builder = Client::builder()
194 .with_host(profile.host)
195 .with_username(profile.username)
196 .with_password(profile.password);
197
198 if let Some(request_timeout) = self.request_timeout {
199 builder = builder.with_request_timeout(request_timeout);
200 }
201
202 let client = builder.build().await.context(ClientBuildSnafu)?;
203
204 Ok(Menmos::new_with_client(client))
205 }
206}