memflow_registry/storage/
mod.rs1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use chrono::{NaiveDateTime, Utc};
6use log::warn;
7use memflow::plugins::plugin_analyzer;
8use memflow::plugins::plugin_analyzer::PluginDescriptorInfo;
9use parking_lot::{lock_api::RwLockReadGuard, RawRwLock, RwLock};
10use serde::{Deserialize, Serialize};
11use tokio::fs::File;
12use tokio::io::AsyncWriteExt;
13
14use crate::{
15 error::{Error, Result},
16 pki::SignatureVerifier,
17};
18
19pub mod database;
20use database::PluginDatabase;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct PluginMetadata {
25 pub digest: String,
27 pub signature: String,
29 pub created_at: NaiveDateTime,
31 pub descriptors: Vec<PluginDescriptorInfo>,
33}
34
35#[derive(Clone)]
37pub struct Storage {
38 root: PathBuf,
39 database: Arc<RwLock<PluginDatabase>>,
40 signature_verifier: Option<SignatureVerifier>,
41}
42
43#[derive(Debug, Serialize, Deserialize)]
45pub enum UploadResponse {
46 Added,
47 AlreadyExists,
48}
49
50impl Storage {
51 pub fn new<P: AsRef<Path>>(root: P) -> Result<Self> {
52 let mut database = PluginDatabase::new();
54
55 let paths = std::fs::read_dir(&root)?;
56 for path in paths.filter_map(|p| p.ok()) {
57 if let Some(extension) = path.path().extension() {
58 if extension.to_str().unwrap_or_default() == "meta" {
59 let metadata: PluginMetadata =
60 serde_json::from_str(&std::fs::read_to_string(path.path())?)
61 .map_err(|e| Error::Parse(e.to_string()))?;
62 database.insert_all(&metadata)?;
63 }
64 }
65 }
66
67 Ok(Self {
68 root: root.as_ref().to_path_buf(),
69 database: Arc::new(RwLock::new(database)),
70 signature_verifier: None,
71 })
72 }
73
74 pub fn with_signature_verifier(mut self, verifier: SignatureVerifier) -> Self {
76 self.signature_verifier = Some(verifier);
77 self
78 }
79
80 pub async fn upload(&self, bytes: &[u8], signature: &str) -> Result<UploadResponse> {
82 if let Some(verifier) = &self.signature_verifier {
84 if let Err(err) = verifier.is_valid(bytes, signature) {
85 warn!("invalid file signature for uploaded binary: {}", err);
86 return Err(Error::Signature("file signature is invalid".to_owned()));
87 }
88 }
89
90 let descriptors = plugin_analyzer::parse_descriptors(bytes)?;
92
93 let digest = sha256::digest(bytes);
95
96 let mut file_name = self.root.clone().join(&digest);
98 file_name.set_extension("plugin");
99
100 if file_name.exists() {
102 warn!("plugin with the same digest was already added");
103 return Ok(UploadResponse::AlreadyExists);
104 }
105
106 let mut plugin_file = File::create(&file_name).await?;
108 plugin_file.write_all(bytes).await?;
109
110 let metadata = PluginMetadata {
112 digest: digest.clone(),
113 signature: signature.to_owned(),
114 created_at: Utc::now().naive_utc(),
115 descriptors: descriptors.clone(),
116 };
117 file_name.set_extension("meta");
118 let mut metadata_file = File::create(&file_name).await?;
119 metadata_file
120 .write_all(serde_json::to_string(&metadata).unwrap().as_bytes())
121 .await?;
122
123 let mut database = self.database.write();
125 database.insert_all(&metadata)?;
126
127 Ok(UploadResponse::Added)
128 }
129
130 pub async fn download(&self, digest: &str) -> Result<File> {
132 let mut file_name = self.root.clone().join(digest);
133 file_name.set_extension("plugin");
134 Ok(File::open(&file_name).await?)
135 }
136
137 pub async fn metadata(&self, digest: &str) -> Result<PluginMetadata> {
139 let mut file_name = self.root.clone().join(digest);
140 file_name.set_extension("meta");
141 let content = tokio::fs::read_to_string(&file_name).await?;
142 Ok(serde_json::from_str(&content).unwrap())
143 }
144
145 pub async fn delete(&self, digest: &str) -> Result<()> {
147 let mut file_name = self.root.clone().join(digest);
149 file_name.set_extension("plugin");
150 if !file_name.exists() {
151 return Err(Error::NotFound("digest was not found".to_owned()));
152 }
153
154 {
156 let mut database = self.database.write();
157 database.delete_by_digest(digest);
158 }
159
160 tokio::fs::remove_file(file_name).await?;
162
163 Ok(())
164 }
165
166 #[inline]
168 pub fn health(&self) -> Result<()> {
169 let paths = std::fs::read_dir(&self.root)?;
170 for _path in paths.filter_map(|p| p.ok()) {
171 }
173 Ok(())
174 }
175
176 #[inline]
178 pub fn database(&self) -> RwLockReadGuard<RawRwLock, PluginDatabase> {
179 self.database.read()
180 }
181}