zenoh_backend_fs/
lib.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use std::{collections::HashMap, fs::DirBuilder, future::Future, io::prelude::*, path::PathBuf};
16
17use async_trait::async_trait;
18use tempfile::tempfile_in;
19use tracing::{debug, warn};
20use zenoh::{
21    bytes::{Encoding, ZBytes},
22    internal::{bail, zenoh_home, zerror},
23    key_expr::{keyexpr, OwnedKeyExpr},
24    query::Parameters,
25    time::Timestamp,
26    try_init_log_from_env, Result as ZResult,
27};
28use zenoh_backend_traits::{
29    config::{StorageConfig, VolumeConfig},
30    Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
31    VolumeInstance,
32};
33use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin};
34
35mod data_info_mgt;
36mod files_mgt;
37use files_mgt::*;
38
39const WORKER_THREAD_NUM: usize = 2;
40const MAX_BLOCK_THREAD_NUM: usize = 50;
41lazy_static::lazy_static! {
42    // The global runtime is used in the dynamic plugins, which we can't get the current runtime
43    static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
44               .worker_threads(WORKER_THREAD_NUM)
45               .max_blocking_threads(MAX_BLOCK_THREAD_NUM)
46               .enable_all()
47               .build()
48               .expect("Unable to create runtime");
49}
50#[inline(always)]
51fn blockon_runtime<F: Future>(task: F) -> F::Output {
52    // Check whether able to get the current runtime
53    match tokio::runtime::Handle::try_current() {
54        Ok(rt) => {
55            // Able to get the current runtime (standalone binary), spawn on the current runtime
56            tokio::task::block_in_place(|| rt.block_on(task))
57        }
58        Err(_) => {
59            // Unable to get the current runtime (dynamic plugins), spawn on the global runtime
60            tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task))
61        }
62    }
63}
64
65/// The environement variable used to configure the root of all storages managed by this FileSystemBackend.
66pub const SCOPE_ENV_VAR: &str = "ZENOH_BACKEND_FS_ROOT";
67
68/// The default root (whithin zenoh's home directory) if the ZENOH_BACKEND_FS_ROOT environment variable is not specified.
69pub const DEFAULT_ROOT_DIR: &str = "zenoh_backend_fs";
70
71// Properies used by the Backend
72//  - None
73
74// Properies used by the Storage
75pub const PROP_STORAGE_READ_ONLY: &str = "read_only";
76pub const PROP_STORAGE_DIR: &str = "dir";
77pub const PROP_STORAGE_ON_CLOSURE: &str = "on_closure";
78pub const PROP_STORAGE_FOLLOW_LINK: &str = "follow_links";
79pub const PROP_STORAGE_KEEP_MIME: &str = "keep_mime_types";
80
81// Special key for None (when the prefix being stripped exactly matches the key)
82pub const ROOT_KEY: &str = "@root";
83
84pub struct FileSystemBackend {}
85
86#[cfg(feature = "dynamic_plugin")]
87zenoh_plugin_trait::declare_plugin!(FileSystemBackend);
88
89impl Plugin for FileSystemBackend {
90    type StartArgs = VolumeConfig;
91    type Instance = VolumeInstance;
92
93    const DEFAULT_NAME: &'static str = "filesystem_backend";
94    const PLUGIN_VERSION: &'static str = plugin_version!();
95    const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!();
96
97    fn start(_name: &str, _config: &Self::StartArgs) -> ZResult<Self::Instance> {
98        try_init_log_from_env();
99        debug!("FileSystem backend {}", Self::PLUGIN_VERSION);
100
101        let root_path = if let Some(dir) = std::env::var_os(SCOPE_ENV_VAR) {
102            PathBuf::from(dir)
103        } else {
104            let mut dir = PathBuf::from(zenoh_home());
105            dir.push(DEFAULT_ROOT_DIR);
106            dir
107        };
108        if let Err(e) = std::fs::create_dir_all(&root_path) {
109            bail!(
110                r#"Failed to create directory ${{{}}}={}: {}"#,
111                SCOPE_ENV_VAR,
112                root_path.display(),
113                e
114            );
115        }
116        let root = match dunce::canonicalize(&root_path) {
117            Ok(dir) => dir,
118            Err(e) => bail!(
119                r#"Invalid path for ${{{}}}={}: {}"#,
120                SCOPE_ENV_VAR,
121                root_path.display(),
122                e
123            ),
124        };
125        debug!("Using root dir: {}", root.display());
126
127        let mut parameters = Parameters::default();
128        parameters.insert::<String, String>("root".into(), root.to_string_lossy().into());
129        parameters.insert::<String, String>("version".into(), Self::PLUGIN_VERSION.into());
130
131        let admin_status = HashMap::from(parameters)
132            .into_iter()
133            .map(|(k, v)| (k, serde_json::Value::String(v)))
134            .collect();
135        Ok(Box::new(FileSystemVolume { admin_status, root }))
136    }
137}
138
139pub struct FileSystemVolume {
140    admin_status: serde_json::Value,
141    root: PathBuf,
142}
143
144fn extract_bool(
145    from: &serde_json::Map<String, serde_json::Value>,
146    key: &str,
147    default: bool,
148) -> ZResult<bool> {
149    match from.get(key) {
150        Some(serde_json::Value::Bool(s)) => Ok(*s),
151        None => Ok(default),
152        _ => bail!(
153            r#"Invalid value for File System Storage configuration: `{}` must be a boolean"#,
154            key
155        ),
156    }
157}
158
159#[async_trait]
160impl Volume for FileSystemVolume {
161    fn get_admin_status(&self) -> serde_json::Value {
162        self.admin_status.clone()
163    }
164
165    fn get_capability(&self) -> Capability {
166        Capability {
167            persistence: Persistence::Durable,
168            history: History::Latest,
169        }
170    }
171
172    async fn create_storage(&self, mut config: StorageConfig) -> ZResult<Box<dyn Storage>> {
173        let volume_cfg = match config.volume_cfg.as_object() {
174            Some(v) => v,
175            None => bail!("fs backed volumes require volume-specific configuration"),
176        };
177
178        let read_only = extract_bool(volume_cfg, PROP_STORAGE_READ_ONLY, false)?;
179        let follow_links = extract_bool(volume_cfg, PROP_STORAGE_FOLLOW_LINK, false)?;
180        let keep_mime = extract_bool(volume_cfg, PROP_STORAGE_KEEP_MIME, true)?;
181        let on_closure = match config.volume_cfg.get(PROP_STORAGE_ON_CLOSURE) {
182            Some(serde_json::Value::String(s)) if s == "delete_all" => OnClosure::DeleteAll,
183            Some(serde_json::Value::String(s)) if s == "do_nothing" => OnClosure::DoNothing,
184            None => OnClosure::DoNothing,
185            Some(s) => {
186                bail!(
187                    r#"Unsupported value {:?} for `on_closure` property: must be either "delete_all" or "do_nothing". Default is "do_nothing""#,
188                    s
189                )
190            }
191        };
192
193        let base_dir =
194            if let Some(serde_json::Value::String(dir)) = config.volume_cfg.get(PROP_STORAGE_DIR) {
195                let dir_path = PathBuf::from(dir.as_str());
196                if dir_path.is_absolute() {
197                    bail!(
198                        r#"Invalid property "{}"="{}": the path must be relative"#,
199                        PROP_STORAGE_DIR,
200                        dir
201                    );
202                }
203                if dir_path
204                    .components()
205                    .any(|c| c == std::path::Component::ParentDir)
206                {
207                    bail!(
208                        r#"Invalid property "{}"="{}": the path must not contain any '..'"#,
209                        PROP_STORAGE_DIR,
210                        dir
211                    );
212                }
213
214                // prepend base_dir with self.root
215                let mut base_dir = self.root.clone();
216                base_dir.push(dir_path);
217                base_dir
218            } else {
219                bail!(
220                    r#"Missing required property for File System Storage: "{}""#,
221                    PROP_STORAGE_DIR
222                )
223            };
224
225        // check if base_dir exists and is readable (and writeable if not "read_only" mode)
226        let mut dir_builder = DirBuilder::new();
227        dir_builder.recursive(true);
228        let base_dir_path = PathBuf::from(&base_dir);
229        if !base_dir_path.exists() {
230            if let Err(err) = dir_builder.create(&base_dir) {
231                bail!(
232                    r#"Cannot create File System Storage on "dir"={:?} : {}"#,
233                    base_dir,
234                    err
235                )
236            }
237        } else if !base_dir_path.is_dir() {
238            bail!(
239                r#"Cannot create File System Storage on "dir"={:?} : this is not a directory"#,
240                base_dir
241            )
242        } else if let Err(err) = base_dir_path.read_dir() {
243            bail!(
244                r#"Cannot create File System Storage on "dir"={:?} : {}"#,
245                base_dir,
246                err
247            )
248        } else if !read_only {
249            // try to write a random file
250            let _ = tempfile_in(&base_dir)
251                .map(|mut f| writeln!(f, "test"))
252                .map_err(|err| {
253                    zerror!(
254                        r#"Cannot create writeable File System Storage on "dir"={:?} : {}"#,
255                        base_dir,
256                        err
257                    )
258                })?;
259        }
260
261        config
262            .volume_cfg
263            .as_object_mut()
264            .unwrap()
265            .insert("dir_full_path".into(), base_dir.to_string_lossy().into());
266
267        tracing::debug!(
268            "Storage on {} will store files in {}",
269            config.key_expr,
270            base_dir.display()
271        );
272
273        let files_mgr = FilesMgr::new(base_dir, follow_links, keep_mime, on_closure).await?;
274        Ok(Box::new(FileSystemStorage {
275            config,
276            files_mgr,
277            read_only,
278        }))
279    }
280}
281
282struct FileSystemStorage {
283    config: StorageConfig,
284    files_mgr: FilesMgr,
285    read_only: bool,
286}
287
288#[async_trait]
289impl Storage for FileSystemStorage {
290    fn get_admin_status(&self) -> serde_json::Value {
291        self.config.to_json_value()
292    }
293
294    async fn put(
295        &mut self,
296        key: Option<OwnedKeyExpr>,
297        payload: ZBytes,
298        encoding: Encoding,
299        timestamp: Timestamp,
300    ) -> ZResult<StorageInsertionResult> {
301        if !self.read_only {
302            if let Some(k) = key {
303                let k = k.as_str();
304                let zfile = self.files_mgr.to_zfile(k);
305                // write file
306                self.files_mgr
307                    .write_file(&zfile, payload.into(), encoding, &timestamp)
308                    .await?;
309                Ok(StorageInsertionResult::Inserted)
310            } else {
311                let zfile = self.files_mgr.to_zfile(ROOT_KEY);
312                // write file
313                self.files_mgr
314                    .write_file(&zfile, payload.into(), encoding, &timestamp)
315                    .await?;
316                Ok(StorageInsertionResult::Inserted)
317            }
318        } else {
319            warn!(
320                "Received PUT for read-only Files System Storage on {:?} - ignored",
321                self.files_mgr.base_dir()
322            );
323            Err("Received update for read-only File System Storage".into())
324        }
325    }
326
327    /// Function called for each incoming delete request to this storage.
328    async fn delete(
329        &mut self,
330        key: Option<OwnedKeyExpr>,
331        _timestamp: Timestamp,
332    ) -> ZResult<StorageInsertionResult> {
333        if !self.read_only {
334            if let Some(k) = key {
335                let k = k.as_str();
336                let zfile = self.files_mgr.to_zfile(k);
337                // delete file
338                self.files_mgr.delete_file(&zfile).await?;
339                Ok(StorageInsertionResult::Deleted)
340            } else {
341                let zfile = self.files_mgr.to_zfile(ROOT_KEY);
342                // delete file
343                self.files_mgr.delete_file(&zfile).await?;
344                Ok(StorageInsertionResult::Deleted)
345            }
346        } else {
347            warn!(
348                "Received DELETE for read-only Files System Storage on {:?} - ignored",
349                self.files_mgr.base_dir()
350            );
351            Err("Received update for read-only File System Storage".into())
352        }
353    }
354
355    /// Function to retrieve the sample associated with a single key.
356    async fn get(
357        &mut self,
358        key: Option<OwnedKeyExpr>,
359        _parameters: &str,
360    ) -> ZResult<Vec<StoredData>> {
361        if key.is_some() {
362            let k = key.clone().unwrap();
363            let k = k.as_str();
364            let zfile = self.files_mgr.to_zfile(k);
365            match self.files_mgr.read_file(&zfile).await {
366                Ok(Some((payload, encoding, timestamp))) => Ok(vec![StoredData {
367                    payload,
368                    encoding,
369                    timestamp,
370                }]),
371                Ok(None) => Ok(vec![]),
372                Err(e) => {
373                    Err(format!("Get key {:?} : failed to read file {} : {}", key, zfile, e).into())
374                }
375            }
376        } else {
377            let zfile = self.files_mgr.to_zfile(ROOT_KEY);
378            match self.files_mgr.read_file(&zfile).await {
379                Ok(Some((payload, encoding, timestamp))) => Ok(vec![StoredData {
380                    payload,
381                    encoding,
382                    timestamp,
383                }]),
384                Ok(None) => Ok(vec![]),
385                Err(e) => {
386                    Err(format!("Get key {:?} : failed to read file {} : {}", key, zfile, e).into())
387                }
388            }
389        }
390    }
391
392    async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>> {
393        let mut result = Vec::new();
394        // Add the root entry if it exists.
395        // Root key can't be acuired from `matching_files` call
396        // because it's name is specially chosen to be not allowed as key value ("@root")
397        if let Some((_, _, timestamp)) = self
398            .files_mgr
399            .read_file(&self.files_mgr.to_zfile(ROOT_KEY))
400            .await?
401        {
402            result.push((None, timestamp));
403        }
404        // Get all files in the filesystem.
405        // Also skip the root key file which was already added above.
406        // This is just for completeness, it's skipped anyway due to it's name starting from '@'
407        for zfile in self
408            .files_mgr
409            .matching_files(unsafe { keyexpr::from_str_unchecked("**") })
410            .filter(|zfile| zfile.zpath != ROOT_KEY)
411        {
412            let trimmed_zpath = get_trimmed_keyexpr(zfile.zpath.as_ref());
413            let trimmed_zfile = self.files_mgr.to_zfile(trimmed_zpath);
414            match self.files_mgr.read_file(&trimmed_zfile).await {
415                Ok(Some((_, _, timestamp))) => {
416                    let zpath = Some(zfile.zpath.as_ref().try_into().unwrap());
417                    result.push((zpath, timestamp));
418                }
419                Ok(None) => (), // file not found, do nothing
420                Err(e) => warn!(
421                    "Getting all entries : failed to read file {} : {}",
422                    zfile, e
423                ),
424            }
425        }
426        Ok(result)
427    }
428}