memflow_registry/storage/
mod.rs

1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use chrono::{NaiveDateTime, Utc};
6use log::warn;
7use memflow::plugins::plugin_analyzer;
8use memflow::plugins::plugin_analyzer::PluginDescriptorInfo;
9use parking_lot::{lock_api::RwLockReadGuard, RawRwLock, RwLock};
10use serde::{Deserialize, Serialize};
11use tokio::fs::File;
12use tokio::io::AsyncWriteExt;
13
14use crate::{
15    error::{Error, Result},
16    pki::SignatureVerifier,
17};
18
19pub mod database;
20use database::PluginDatabase;
21
22/// Metadata attached to each file
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct PluginMetadata {
25    /// The sha256sum of the binary file
26    pub digest: String,
27    /// File signature of this binary
28    pub signature: String,
29    /// Timestamp at which the file was added
30    pub created_at: NaiveDateTime,
31    /// The plugin descriptor
32    pub descriptors: Vec<PluginDescriptorInfo>,
33}
34
35/// Local Plugin storage
36#[derive(Clone)]
37pub struct Storage {
38    root: PathBuf,
39    database: Arc<RwLock<PluginDatabase>>,
40    signature_verifier: Option<SignatureVerifier>,
41}
42
43/// Result of an upload request
44#[derive(Debug, Serialize, Deserialize)]
45pub enum UploadResponse {
46    Added,
47    AlreadyExists,
48}
49
50impl Storage {
51    pub fn new<P: AsRef<Path>>(root: P) -> Result<Self> {
52        // TODO: create path if not exists
53        let mut database = PluginDatabase::new();
54
55        let paths = std::fs::read_dir(&root)?;
56        for path in paths.filter_map(|p| p.ok()) {
57            if let Some(extension) = path.path().extension() {
58                if extension.to_str().unwrap_or_default() == "meta" {
59                    let metadata: PluginMetadata =
60                        serde_json::from_str(&std::fs::read_to_string(path.path())?)
61                            .map_err(|e| Error::Parse(e.to_string()))?;
62                    database.insert_all(&metadata)?;
63                }
64            }
65        }
66
67        Ok(Self {
68            root: root.as_ref().to_path_buf(),
69            database: Arc::new(RwLock::new(database)),
70            signature_verifier: None,
71        })
72    }
73
74    /// Adds the given SignatureVerifier to the file store.
75    pub fn with_signature_verifier(mut self, verifier: SignatureVerifier) -> Self {
76        self.signature_verifier = Some(verifier);
77        self
78    }
79
80    /// Writes the specified connector into the path and adds it into the database.
81    pub async fn upload(&self, bytes: &[u8], signature: &str) -> Result<UploadResponse> {
82        // TODO: what happens with old signatures in case we change the signing key?
83        if let Some(verifier) = &self.signature_verifier {
84            if let Err(err) = verifier.is_valid(bytes, signature) {
85                warn!("invalid file signature for uploaded binary: {}", err);
86                return Err(Error::Signature("file signature is invalid".to_owned()));
87            }
88        }
89
90        // parse descriptors
91        let descriptors = plugin_analyzer::parse_descriptors(bytes)?;
92
93        // generate sha256 digest
94        let digest = sha256::digest(bytes);
95
96        // plugin path: {digest}.plugin
97        let mut file_name = self.root.clone().join(&digest);
98        file_name.set_extension("plugin");
99
100        // check if digest is already existent
101        if file_name.exists() {
102            warn!("plugin with the same digest was already added");
103            return Ok(UploadResponse::AlreadyExists);
104        }
105
106        // write plugin
107        let mut plugin_file = File::create(&file_name).await?;
108        plugin_file.write_all(bytes).await?;
109
110        // metadata path: {digest}.meta
111        let metadata = PluginMetadata {
112            digest: digest.clone(),
113            signature: signature.to_owned(),
114            created_at: Utc::now().naive_utc(),
115            descriptors: descriptors.clone(),
116        };
117        file_name.set_extension("meta");
118        let mut metadata_file = File::create(&file_name).await?;
119        metadata_file
120            .write_all(serde_json::to_string(&metadata).unwrap().as_bytes())
121            .await?;
122
123        // add to database
124        let mut database = self.database.write();
125        database.insert_all(&metadata)?;
126
127        Ok(UploadResponse::Added)
128    }
129
130    /// Returns a handle to the file
131    pub async fn download(&self, digest: &str) -> Result<File> {
132        let mut file_name = self.root.clone().join(digest);
133        file_name.set_extension("plugin");
134        Ok(File::open(&file_name).await?)
135    }
136
137    /// Returns the metadata of the file
138    pub async fn metadata(&self, digest: &str) -> Result<PluginMetadata> {
139        let mut file_name = self.root.clone().join(digest);
140        file_name.set_extension("meta");
141        let content = tokio::fs::read_to_string(&file_name).await?;
142        Ok(serde_json::from_str(&content).unwrap())
143    }
144
145    /// Deletes the file with the given digest from the database.
146    pub async fn delete(&self, digest: &str) -> Result<()> {
147        // check if file exists
148        let mut file_name = self.root.clone().join(digest);
149        file_name.set_extension("plugin");
150        if !file_name.exists() {
151            return Err(Error::NotFound("digest was not found".to_owned()));
152        }
153
154        // lock and remove from database
155        {
156            let mut database = self.database.write();
157            database.delete_by_digest(digest);
158        }
159
160        // try to remove the file
161        tokio::fs::remove_file(file_name).await?;
162
163        Ok(())
164    }
165
166    /// Returns the health state of the database by checking if the storage folder is still accessible
167    #[inline]
168    pub fn health(&self) -> Result<()> {
169        let paths = std::fs::read_dir(&self.root)?;
170        for _path in paths.filter_map(|p| p.ok()) {
171            // no-op
172        }
173        Ok(())
174    }
175
176    /// Returns a read-only lock to the underlying database
177    #[inline]
178    pub fn database(&self) -> RwLockReadGuard<RawRwLock, PluginDatabase> {
179        self.database.read()
180    }
181}