use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::debug;
use crate::module::registry::cache::{CachedModule, LocalCache};
use crate::module::registry::cas::{ContentAddressableStorage, ModuleHash};
use crate::module::registry::manifest::ModuleManifest;
use crate::module::security::signing::ModuleSigner;
use crate::module::traits::{module_error_msg, ModuleError};
#[derive(Debug, Clone)]
pub struct RegistryMirror {
pub peer_addr: crate::network::transport::TransportAddr,
pub public_key: Option<String>,
pub last_verified: Option<u64>,
pub reputation: f64,
}
#[derive(Debug, Clone)]
pub struct ModuleEntry {
pub hash: ModuleHash,
pub name: String,
pub version: String,
pub manifest_hash: ModuleHash,
pub binary_hash: ModuleHash,
pub manifest: ModuleManifest,
pub binary: Option<Vec<u8>>,
}
pub struct ModuleRegistry {
mirrors: Vec<RegistryMirror>,
local_cache: Arc<RwLock<LocalCache>>,
cas: Arc<RwLock<ContentAddressableStorage>>,
signer: ModuleSigner,
cache_dir: PathBuf,
network_manager: Option<Arc<crate::network::NetworkManager>>,
}
impl ModuleRegistry {
pub fn cas(&self) -> Arc<RwLock<ContentAddressableStorage>> {
Arc::clone(&self.cas)
}
pub fn local_cache(&self) -> Arc<RwLock<LocalCache>> {
Arc::clone(&self.local_cache)
}
}
impl ModuleRegistry {
pub fn new<P: AsRef<Path>>(
cache_dir: P,
cas_dir: P,
mirrors: Vec<RegistryMirror>,
) -> Result<Self, ModuleError> {
let cache_dir = cache_dir.as_ref().to_path_buf();
let cas_dir = cas_dir.as_ref().to_path_buf();
let local_cache = Arc::new(RwLock::new(
LocalCache::load(&cache_dir).unwrap_or_else(|_| LocalCache::new()),
));
let cas = Arc::new(RwLock::new(ContentAddressableStorage::new(cas_dir)?));
Ok(Self {
mirrors,
local_cache,
cas,
signer: ModuleSigner::new(),
cache_dir,
network_manager: None,
})
}
pub fn with_network_manager(
mut self,
network_manager: Arc<crate::network::NetworkManager>,
) -> Self {
self.network_manager = Some(network_manager);
self
}
pub fn set_network_manager(&mut self, network_manager: Arc<crate::network::NetworkManager>) {
self.network_manager = Some(network_manager);
}
pub async fn fetch_module(&self, name: &str) -> Result<ModuleEntry, ModuleError> {
{
let cache = self.local_cache.read().await;
if let Some(cached) = cache.get(name) {
if cache.is_valid(name) {
debug!("Module {} found in cache", name);
if let Ok(entry) = self.load_from_cache(cached).await {
return Ok(entry);
}
}
}
}
if !self.mirrors.is_empty() {
if let Ok(entry) = self.fetch_from_mirrors(name).await {
self.cache_entry(&entry).await?;
return Ok(entry);
}
}
Err(ModuleError::ModuleNotFound(format!(
"Module {name} not found in cache or mirrors"
)))
}
async fn fetch_from_mirrors(&self, name: &str) -> Result<ModuleEntry, ModuleError> {
for mirror in &self.mirrors {
match self.fetch_from_peer(&mirror.peer_addr, name).await {
Ok(entry) => {
if self.verify_entry(&entry).await.is_ok() {
return Ok(entry);
}
}
Err(_) => {
continue;
}
}
}
Err(ModuleError::ModuleNotFound(format!(
"Module {name} not found in any peer"
)))
}
async fn fetch_from_peer(
&self,
peer_addr: &crate::network::transport::TransportAddr,
name: &str,
) -> Result<ModuleEntry, ModuleError> {
let network_manager = self.network_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_SET_P2P_FETCHING.to_string(),
)
})?;
let socket_addr = match peer_addr {
crate::network::transport::TransportAddr::Tcp(addr) => *addr,
#[cfg(feature = "quinn")]
crate::network::transport::TransportAddr::Quinn(addr) => *addr,
#[cfg(feature = "iroh")]
crate::network::transport::TransportAddr::Iroh(_) => {
return Err(ModuleError::OperationError(
module_error_msg::IROH_TRANSPORT_NOT_SUPPORTED_MODULE_FETCHING.to_string(),
));
}
};
let (request_id, response_rx) = network_manager.register_request(socket_addr);
use crate::network::protocol::{GetModuleMessage, ProtocolMessage, ProtocolParser};
let get_module_msg = GetModuleMessage {
request_id,
name: name.to_string(),
version: None, payment_id: None, };
let message_wire =
ProtocolParser::serialize_message(&ProtocolMessage::GetModule(get_module_msg))
.map_err(|e| ModuleError::op_err("Failed to serialize GetModule", e))?;
network_manager
.send_to_peer(socket_addr, message_wire)
.await
.map_err(|e| ModuleError::op_err("Failed to send GetModule to peer", e))?;
let response_data = tokio::time::timeout(tokio::time::Duration::from_secs(30), response_rx)
.await
.map_err(|_| ModuleError::Timeout)?
.map_err(|_| {
ModuleError::OperationError(module_error_msg::RESPONSE_CHANNEL_CLOSED.to_string())
})?;
let parsed = ProtocolParser::parse_message(&response_data)
.map_err(|e| ModuleError::op_err("Failed to parse Module response", e))?;
let module_msg = match parsed {
ProtocolMessage::Module(msg) => msg,
_ => {
return Err(ModuleError::OperationError(
module_error_msg::EXPECTED_MODULE_MESSAGE.to_string(),
))
}
};
if module_msg.request_id != request_id {
return Err(ModuleError::OperationError(
module_error_msg::REQUEST_ID_MISMATCH.to_string(),
));
}
let manifest_str = String::from_utf8(module_msg.manifest.clone())
.map_err(|e| ModuleError::InvalidManifest(format!("Failed to decode manifest: {e}")))?;
let manifest: ModuleManifest = toml::from_str(&manifest_str)
.map_err(|e| ModuleError::InvalidManifest(format!("Failed to parse manifest: {e}")))?;
Ok(ModuleEntry {
hash: module_msg.hash,
name: module_msg.name,
version: module_msg.version,
manifest_hash: module_msg.manifest_hash,
binary_hash: module_msg.binary_hash,
manifest,
binary: module_msg.binary,
})
}
async fn verify_and_create_entry(
&self,
data: HashMap<String, serde_json::Value>,
) -> Result<ModuleEntry, ModuleError> {
let name = data
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MISSING_NAME_FIELD.to_string())
})?
.to_string();
let version = data
.get("version")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MISSING_VERSION_FIELD.to_string())
})?
.to_string();
let hash_str = data.get("hash").and_then(|v| v.as_str()).ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MISSING_HASH_FIELD.to_string())
})?;
let manifest_hash_str = data
.get("manifest_hash")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ModuleError::OperationError(
module_error_msg::MISSING_MANIFEST_HASH_FIELD.to_string(),
)
})?;
let binary_hash_str = data
.get("binary_hash")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MISSING_BINARY_HASH_FIELD.to_string())
})?;
let hash = hex::decode(hash_str)
.map_err(|e| ModuleError::op_err("Invalid hash hex", e))?
.try_into()
.map_err(|_| {
ModuleError::OperationError(module_error_msg::HASH_MUST_BE_32_BYTES.to_string())
})?;
let manifest_hash = hex::decode(manifest_hash_str)
.map_err(|e| ModuleError::op_err("Invalid manifest_hash hex", e))?
.try_into()
.map_err(|_| {
ModuleError::OperationError(
module_error_msg::MANIFEST_HASH_MUST_BE_32_BYTES.to_string(),
)
})?;
let binary_hash = hex::decode(binary_hash_str)
.map_err(|e| ModuleError::op_err("Invalid binary_hash hex", e))?
.try_into()
.map_err(|_| {
ModuleError::OperationError(
module_error_msg::BINARY_HASH_MUST_BE_32_BYTES.to_string(),
)
})?;
let manifest = if let Some(manifest_str) = data.get("manifest").and_then(|v| v.as_str()) {
toml::from_str::<ModuleManifest>(manifest_str)
.map_err(|e| ModuleError::op_err("Invalid manifest TOML", e))?
} else if let Some(manifest_obj) = data.get("manifest") {
serde_json::from_value::<ModuleManifest>(manifest_obj.clone())
.map_err(|e| ModuleError::op_err("Invalid manifest JSON", e))?
} else {
return Err(ModuleError::OperationError(
module_error_msg::MISSING_MANIFEST_FIELD.to_string(),
));
};
let cas = self.cas.read().await;
let manifest_bytes = toml::to_string(&manifest)
.map_err(|e| ModuleError::op_err("Failed to serialize manifest", e))?
.into_bytes();
if !cas.verify(&manifest_bytes, &manifest_hash) {
return Err(ModuleError::CryptoError(
"Manifest hash verification failed".to_string(),
));
}
let binary = data
.get("binary")
.and_then(|v| {
v.as_str().and_then(|s| hex::decode(s).ok()).or_else(|| {
v.as_array().map(|arr| {
arr.iter()
.filter_map(|item| item.as_u64().map(|n| n as u8))
.collect()
})
})
})
.and_then(|bin_data| {
if cas.verify(&bin_data, &binary_hash) {
Some(bin_data)
} else {
None
}
});
Ok(ModuleEntry {
hash,
name,
version,
manifest_hash,
binary_hash,
manifest,
binary,
})
}
async fn load_from_cache(&self, cached: &CachedModule) -> Result<ModuleEntry, ModuleError> {
let manifest_data = self.cas.read().await.get(&cached.manifest_hash)?;
let cas = self.cas.read().await;
if !cas.verify(&manifest_data, &cached.manifest_hash) {
return Err(ModuleError::CryptoError(
"Cached manifest hash mismatch".to_string(),
));
}
drop(cas);
let manifest_str = String::from_utf8(manifest_data).map_err(|e| {
ModuleError::InvalidManifest(format!("Failed to decode cached manifest: {e}"))
})?;
let manifest: ModuleManifest = toml::from_str(&manifest_str).map_err(|e| {
ModuleError::InvalidManifest(format!("Failed to parse cached manifest: {e}"))
})?;
let binary = if cached.local_path.exists() {
Some(
std::fs::read(&cached.local_path)
.map_err(|e| ModuleError::op_err("Failed to read cached binary", e))?,
)
} else {
None
};
if let Some(ref bin_data) = binary {
if !self.cas.read().await.verify(bin_data, &cached.binary_hash) {
return Err(ModuleError::CryptoError(
"Cached binary hash mismatch".to_string(),
));
}
}
Ok(ModuleEntry {
hash: cached.hash,
name: cached.name.clone(),
version: cached.version.clone(),
manifest_hash: cached.manifest_hash,
binary_hash: cached.binary_hash,
manifest,
binary,
})
}
async fn cache_entry(&self, entry: &ModuleEntry) -> Result<(), ModuleError> {
let manifest_data = toml::to_string(&entry.manifest)
.map_err(|e| ModuleError::op_err("Failed to serialize manifest", e))?;
let mut cas = self.cas.write().await;
if !cas.has(&entry.manifest_hash) {
cas.store(manifest_data.as_bytes())?;
}
if let Some(ref binary_data) = entry.binary {
if !cas.has(&entry.binary_hash) {
cas.store(binary_data)?;
}
}
drop(cas);
let mut cache = self.local_cache.write().await;
let cached = CachedModule {
name: entry.name.clone(),
version: entry.version.clone(),
hash: entry.hash,
manifest_hash: entry.manifest_hash,
binary_hash: entry.binary_hash,
verified_at: crate::utils::current_timestamp(),
verified_by: vec![],
local_path: self.cache_dir.join(format!("{}.bin", entry.name)),
expires_at: None, };
cache.cache(cached);
cache.update_sync_time();
cache.save(&self.cache_dir)?;
Ok(())
}
pub async fn verify_entry(&self, entry: &ModuleEntry) -> Result<(), ModuleError> {
if entry.manifest.has_signatures() {
let manifest_data = toml::to_string(&entry.manifest)
.map_err(|e| ModuleError::op_err("Failed to serialize manifest", e))?;
let signatures = entry.manifest.get_signatures();
let public_keys = entry.manifest.get_public_keys();
let threshold = entry.manifest.get_threshold().ok_or_else(|| {
ModuleError::CryptoError("Signature threshold not specified".to_string())
})?;
let valid = self.signer.verify_manifest(
manifest_data.as_bytes(),
&signatures,
&public_keys,
threshold,
)?;
if !valid {
return Err(ModuleError::CryptoError(format!(
"Manifest signature verification failed for module {}",
entry.name
)));
}
}
let manifest_data = toml::to_string(&entry.manifest)
.map_err(|e| ModuleError::op_err("Failed to serialize manifest", e))?;
let cas = self.cas.read().await;
if !cas.verify(manifest_data.as_bytes(), &entry.manifest_hash) {
return Err(ModuleError::CryptoError(
"Manifest hash mismatch".to_string(),
));
}
if let Some(ref binary_data) = entry.binary {
if !cas.verify(binary_data, &entry.binary_hash) {
return Err(ModuleError::CryptoError("Binary hash mismatch".to_string()));
}
}
Ok(())
}
}