1use std::{collections::HashMap, fs::DirBuilder, future::Future, io::prelude::*, path::PathBuf};
16
17use async_trait::async_trait;
18use tempfile::tempfile_in;
19use tracing::{debug, warn};
20use zenoh::{
21 bytes::{Encoding, ZBytes},
22 internal::{bail, zenoh_home, zerror},
23 key_expr::{keyexpr, OwnedKeyExpr},
24 query::Parameters,
25 time::Timestamp,
26 try_init_log_from_env, Result as ZResult,
27};
28use zenoh_backend_traits::{
29 config::{StorageConfig, VolumeConfig},
30 Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
31 VolumeInstance,
32};
33use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};
34
35mod data_info_mgt;
36mod files_mgt;
37use files_mgt::*;
38
39const WORKER_THREAD_NUM: usize = 2;
40const MAX_BLOCK_THREAD_NUM: usize = 50;
41lazy_static::lazy_static! {
42 static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
44 .worker_threads(WORKER_THREAD_NUM)
45 .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
46 .enable_all()
47 .build()
48 .expect("Unable to create runtime");
49}
50#[inline(always)]
51fn blockon_runtime<F: Future>(task: F) -> F::Output {
52 match tokio::runtime::Handle::try_current() {
54 Ok(rt) => {
55 tokio::task::block_in_place(|| rt.block_on(task))
57 }
58 Err(_) => {
59 tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
61 }
62 }
63}
64
65pub const SCOPE_ENV_VAR: &str = "ZENOH_BACKEND_FS_ROOT";
67
68pub const DEFAULT_ROOT_DIR: &str = "zenoh_backend_fs";
70
71pub const PROP_STORAGE_READ_ONLY: &str = "read_only";
76pub const PROP_STORAGE_DIR: &str = "dir";
77pub const PROP_STORAGE_ON_CLOSURE: &str = "on_closure";
78pub const PROP_STORAGE_FOLLOW_LINK: &str = "follow_links";
79pub const PROP_STORAGE_KEEP_MIME: &str = "keep_mime_types";
80
81pub const ROOT_KEY: &str = "@root";
83
84pub struct FileSystemBackend {}
85
86#[cfg(feature = "dynamic_plugin")]
87zenoh_plugin_trait::declare_plugin!(FileSystemBackend);
88
89impl Plugin for FileSystemBackend {
90 type StartArgs = VolumeConfig;
91 type Instance = VolumeInstance;
92
93 const DEFAULT_NAME: &'static str = "filesystem_backend";
94 const PLUGIN_VERSION: &'static str = plugin_version!();
95 const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
96
97 fn start(_name: &str, _config: &Self::StartArgs) -> ZResult<Self::Instance> {
98 try_init_log_from_env();
99 debug!("FileSystem backend {}", Self::PLUGIN_VERSION);
100
101 let root_path = if let Some(dir) = std::env::var_os(SCOPE_ENV_VAR) {
102 PathBuf::from(dir)
103 } else {
104 let mut dir = PathBuf::from(zenoh_home());
105 dir.push(DEFAULT_ROOT_DIR);
106 dir
107 };
108 if let Err(e) = std::fs::create_dir_all(&root_path) {
109 bail!(
110 r#"Failed to create directory ${{{}}}={}: {}"#,
111 SCOPE_ENV_VAR,
112 root_path.display(),
113 e
114 );
115 }
116 let root = match dunce::canonicalize(&root_path) {
117 Ok(dir) => dir,
118 Err(e) => bail!(
119 r#"Invalid path for ${{{}}}={}: {}"#,
120 SCOPE_ENV_VAR,
121 root_path.display(),
122 e
123 ),
124 };
125 debug!("Using root dir: {}", root.display());
126
127 let mut parameters = Parameters::default();
128 parameters.insert::<String, String>("root".into(), root.to_string_lossy().into());
129 parameters.insert::<String, String>("version".into(), Self::PLUGIN_VERSION.into());
130
131 let admin_status = HashMap::from(parameters)
132 .into_iter()
133 .map(|(k, v)| (k, serde_json::Value::String(v)))
134 .collect();
135 Ok(Box::new(FileSystemVolume { admin_status, root }))
136 }
137}
138
139pub struct FileSystemVolume {
140 admin_status: serde_json::Value,
141 root: PathBuf,
142}
143
144fn extract_bool(
145 from: &serde_json::Map<String, serde_json::Value>,
146 key: &str,
147 default: bool,
148) -> ZResult<bool> {
149 match from.get(key) {
150 Some(serde_json::Value::Bool(s)) => Ok(*s),
151 None => Ok(default),
152 _ => bail!(
153 r#"Invalid value for File System Storage configuration: `{}` must be a boolean"#,
154 key
155 ),
156 }
157}
158
159#[async_trait]
160impl Volume for FileSystemVolume {
161 fn get_admin_status(&self) -> serde_json::Value {
162 self.admin_status.clone()
163 }
164
165 fn get_capability(&self) -> Capability {
166 Capability {
167 persistence: Persistence::Durable,
168 history: History::Latest,
169 }
170 }
171
172 async fn create_storage(&self, mut config: StorageConfig) -> ZResult<Box<dyn Storage>> {
173 let volume_cfg = match config.volume_cfg.as_object() {
174 Some(v) => v,
175 None => bail!("fs backed volumes require volume-specific configuration"),
176 };
177
178 let read_only = extract_bool(volume_cfg, PROP_STORAGE_READ_ONLY, false)?;
179 let follow_links = extract_bool(volume_cfg, PROP_STORAGE_FOLLOW_LINK, false)?;
180 let keep_mime = extract_bool(volume_cfg, PROP_STORAGE_KEEP_MIME, true)?;
181 let on_closure = match config.volume_cfg.get(PROP_STORAGE_ON_CLOSURE) {
182 Some(serde_json::Value::String(s)) if s == "delete_all" => OnClosure::DeleteAll,
183 Some(serde_json::Value::String(s)) if s == "do_nothing" => OnClosure::DoNothing,
184 None => OnClosure::DoNothing,
185 Some(s) => {
186 bail!(
187 r#"Unsupported value {:?} for `on_closure` property: must be either "delete_all" or "do_nothing". Default is "do_nothing""#,
188 s
189 )
190 }
191 };
192
193 let base_dir =
194 if let Some(serde_json::Value::String(dir)) = config.volume_cfg.get(PROP_STORAGE_DIR) {
195 let dir_path = PathBuf::from(dir.as_str());
196 if dir_path.is_absolute() {
197 bail!(
198 r#"Invalid property "{}"="{}": the path must be relative"#,
199 PROP_STORAGE_DIR,
200 dir
201 );
202 }
203 if dir_path
204 .components()
205 .any(|c| c == std::path::Component::ParentDir)
206 {
207 bail!(
208 r#"Invalid property "{}"="{}": the path must not contain any '..'"#,
209 PROP_STORAGE_DIR,
210 dir
211 );
212 }
213
214 let mut base_dir = self.root.clone();
216 base_dir.push(dir_path);
217 base_dir
218 } else {
219 bail!(
220 r#"Missing required property for File System Storage: "{}""#,
221 PROP_STORAGE_DIR
222 )
223 };
224
225 let mut dir_builder = DirBuilder::new();
227 dir_builder.recursive(true);
228 let base_dir_path = PathBuf::from(&base_dir);
229 if !base_dir_path.exists() {
230 if let Err(err) = dir_builder.create(&base_dir) {
231 bail!(
232 r#"Cannot create File System Storage on "dir"={:?} : {}"#,
233 base_dir,
234 err
235 )
236 }
237 } else if !base_dir_path.is_dir() {
238 bail!(
239 r#"Cannot create File System Storage on "dir"={:?} : this is not a directory"#,
240 base_dir
241 )
242 } else if let Err(err) = base_dir_path.read_dir() {
243 bail!(
244 r#"Cannot create File System Storage on "dir"={:?} : {}"#,
245 base_dir,
246 err
247 )
248 } else if !read_only {
249 let _ = tempfile_in(&base_dir)
251 .map(|mut f| writeln!(f, "test"))
252 .map_err(|err| {
253 zerror!(
254 r#"Cannot create writeable File System Storage on "dir"={:?} : {}"#,
255 base_dir,
256 err
257 )
258 })?;
259 }
260
261 config
262 .volume_cfg
263 .as_object_mut()
264 .unwrap()
265 .insert("dir_full_path".into(), base_dir.to_string_lossy().into());
266
267 tracing::debug!(
268 "Storage on {} will store files in {}",
269 config.key_expr,
270 base_dir.display()
271 );
272
273 let files_mgr = FilesMgr::new(base_dir, follow_links, keep_mime, on_closure).await?;
274 Ok(Box::new(FileSystemStorage {
275 config,
276 files_mgr,
277 read_only,
278 }))
279 }
280}
281
282struct FileSystemStorage {
283 config: StorageConfig,
284 files_mgr: FilesMgr,
285 read_only: bool,
286}
287
288#[async_trait]
289impl Storage for FileSystemStorage {
290 fn get_admin_status(&self) -> serde_json::Value {
291 self.config.to_json_value()
292 }
293
294 async fn put(
295 &mut self,
296 key: Option<OwnedKeyExpr>,
297 payload: ZBytes,
298 encoding: Encoding,
299 timestamp: Timestamp,
300 ) -> ZResult<StorageInsertionResult> {
301 if !self.read_only {
302 if let Some(k) = key {
303 let k = k.as_str();
304 let zfile = self.files_mgr.to_zfile(k);
305 self.files_mgr
307 .write_file(&zfile, payload.into(), encoding, ×tamp)
308 .await?;
309 Ok(StorageInsertionResult::Inserted)
310 } else {
311 let zfile = self.files_mgr.to_zfile(ROOT_KEY);
312 self.files_mgr
314 .write_file(&zfile, payload.into(), encoding, ×tamp)
315 .await?;
316 Ok(StorageInsertionResult::Inserted)
317 }
318 } else {
319 warn!(
320 "Received PUT for read-only Files System Storage on {:?} - ignored",
321 self.files_mgr.base_dir()
322 );
323 Err("Received update for read-only File System Storage".into())
324 }
325 }
326
327 async fn delete(
329 &mut self,
330 key: Option<OwnedKeyExpr>,
331 _timestamp: Timestamp,
332 ) -> ZResult<StorageInsertionResult> {
333 if !self.read_only {
334 if let Some(k) = key {
335 let k = k.as_str();
336 let zfile = self.files_mgr.to_zfile(k);
337 self.files_mgr.delete_file(&zfile).await?;
339 Ok(StorageInsertionResult::Deleted)
340 } else {
341 let zfile = self.files_mgr.to_zfile(ROOT_KEY);
342 self.files_mgr.delete_file(&zfile).await?;
344 Ok(StorageInsertionResult::Deleted)
345 }
346 } else {
347 warn!(
348 "Received DELETE for read-only Files System Storage on {:?} - ignored",
349 self.files_mgr.base_dir()
350 );
351 Err("Received update for read-only File System Storage".into())
352 }
353 }
354
355 async fn get(
357 &mut self,
358 key: Option<OwnedKeyExpr>,
359 _parameters: &str,
360 ) -> ZResult<Vec<StoredData>> {
361 if key.is_some() {
362 let k = key.clone().unwrap();
363 let k = k.as_str();
364 let zfile = self.files_mgr.to_zfile(k);
365 match self.files_mgr.read_file(&zfile).await {
366 Ok(Some((payload, encoding, timestamp))) => Ok(vec![StoredData {
367 payload,
368 encoding,
369 timestamp,
370 }]),
371 Ok(None) => Ok(vec![]),
372 Err(e) => {
373 Err(format!("Get key {:?} : failed to read file {} : {}", key, zfile, e).into())
374 }
375 }
376 } else {
377 let zfile = self.files_mgr.to_zfile(ROOT_KEY);
378 match self.files_mgr.read_file(&zfile).await {
379 Ok(Some((payload, encoding, timestamp))) => Ok(vec![StoredData {
380 payload,
381 encoding,
382 timestamp,
383 }]),
384 Ok(None) => Ok(vec![]),
385 Err(e) => {
386 Err(format!("Get key {:?} : failed to read file {} : {}", key, zfile, e).into())
387 }
388 }
389 }
390 }
391
392 async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>> {
393 let mut result = Vec::new();
394 if let Some((_, _, timestamp)) = self
398 .files_mgr
399 .read_file(&self.files_mgr.to_zfile(ROOT_KEY))
400 .await?
401 {
402 result.push((None, timestamp));
403 }
404 for zfile in self
408 .files_mgr
409 .matching_files(unsafe { keyexpr::from_str_unchecked("**") })
410 .filter(|zfile| zfile.zpath != ROOT_KEY)
411 {
412 let trimmed_zpath = get_trimmed_keyexpr(zfile.zpath.as_ref());
413 let trimmed_zfile = self.files_mgr.to_zfile(trimmed_zpath);
414 match self.files_mgr.read_file(&trimmed_zfile).await {
415 Ok(Some((_, _, timestamp))) => {
416 let zpath = Some(zfile.zpath.as_ref().try_into().unwrap());
417 result.push((zpath, timestamp));
418 }
419 Ok(None) => (), Err(e) => warn!(
421 "Getting all entries : failed to read file {} : {}",
422 zfile, e
423 ),
424 }
425 }
426 Ok(result)
427 }
428}