use crate::core::error::{Result, XLinkError};
use async_trait::async_trait;
use sha2::{Digest, Sha256};
use std::sync::Arc;
#[async_trait]
pub trait DistributedStore: Send + Sync {
async fn upload(&self, data: &[u8]) -> Result<String>;
async fn download(&self, hash: &str) -> Result<Vec<u8>>;
fn protocol_name(&self) -> &str;
}
pub struct FileDistributedStore {
base_path: std::path::PathBuf,
}
impl FileDistributedStore {
pub async fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
let base_path = path.as_ref().to_path_buf();
if !base_path.exists() {
tokio::fs::create_dir_all(&base_path)
.await
.map_err(Into::<XLinkError>::into)?;
}
Ok(Self { base_path })
}
fn compute_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
format!("Qm{}", hex::encode(result))
}
fn get_path(&self, hash: &str) -> std::path::PathBuf {
self.base_path.join(hash)
}
}
#[async_trait]
impl DistributedStore for FileDistributedStore {
async fn upload(&self, data: &[u8]) -> Result<String> {
let hash = Self::compute_hash(data);
let path = self.get_path(&hash);
tokio::fs::write(path, data)
.await
.map_err(Into::<XLinkError>::into)?;
log::info!("[DistStore] Uploaded {} bytes, CID: {}", data.len(), hash);
Ok(hash)
}
async fn download(&self, hash: &str) -> Result<Vec<u8>> {
let path = self.get_path(hash);
if !path.exists() {
return Err(XLinkError::device_not_found(
format!("Content not found for hash: {}", hash),
file!(),
));
}
let data = tokio::fs::read(path)
.await
.map_err(Into::<XLinkError>::into)?;
log::info!(
"[DistStore] Downloaded {} bytes from CID: {}",
data.len(),
hash
);
Ok(data)
}
fn protocol_name(&self) -> &str {
"FileSystem(IPFS-like)"
}
}
pub struct DistributedStorageAdapter {
distributed_store: Arc<dyn DistributedStore>,
local_cache: Arc<crate::storage::memory_store::MemoryStorage>,
}
impl DistributedStorageAdapter {
pub fn new(distributed_store: Arc<dyn DistributedStore>) -> Self {
Self {
distributed_store,
local_cache: Arc::new(crate::storage::memory_store::MemoryStorage::new()),
}
}
}
#[async_trait]
impl crate::core::traits::Storage for DistributedStorageAdapter {
async fn save_message(
&self,
message: &crate::core::types::Message,
) -> crate::core::error::Result<()> {
let data = serde_json::to_vec(message).map_err(Into::<XLinkError>::into)?;
let hash = self.distributed_store.upload(&data).await?;
let hash_message = crate::core::types::Message {
id: message.id,
sender: message.sender,
recipient: message.recipient,
group_id: message.group_id,
payload: crate::core::types::MessagePayload::Text(hash),
priority: message.priority,
timestamp: message.timestamp,
require_ack: message.require_ack,
};
self.local_cache.save_message(&hash_message).await
}
async fn get_pending_messages(
&self,
device_id: &crate::core::types::DeviceId,
) -> crate::core::error::Result<Vec<crate::core::types::Message>> {
let hash_messages = self.local_cache.get_pending_messages(device_id).await?;
let mut messages = Vec::new();
for hash_msg in hash_messages {
if let crate::core::types::MessagePayload::Text(hash) = &hash_msg.payload {
let data = self.distributed_store.download(hash).await?;
let message: crate::core::types::Message =
serde_json::from_slice(&data).map_err(Into::<XLinkError>::into)?;
messages.push(message);
}
}
Ok(messages)
}
async fn remove_message(&self, message_id: &uuid::Uuid) -> crate::core::error::Result<()> {
self.local_cache.remove_message(message_id).await
}
async fn save_audit_log(&self, log: String) -> crate::core::error::Result<()> {
self.local_cache.save_audit_log(log).await
}
async fn get_audit_logs(&self, limit: usize) -> crate::core::error::Result<Vec<String>> {
self.local_cache.get_audit_logs(limit).await
}
async fn cleanup_old_data(&self, days: u32) -> crate::core::error::Result<u64> {
self.local_cache.cleanup_old_data(days).await
}
async fn save_pending_message(
&self,
message: &crate::core::types::Message,
) -> crate::core::error::Result<()> {
self.local_cache.save_pending_message(message).await
}
async fn get_pending_messages_for_recovery(
&self,
device_id: &crate::core::types::DeviceId,
) -> crate::core::error::Result<Vec<crate::core::types::Message>> {
self.local_cache
.get_pending_messages_for_recovery(device_id)
.await
}
async fn remove_pending_message(
&self,
message_id: &uuid::Uuid,
) -> crate::core::error::Result<()> {
self.local_cache.remove_pending_message(message_id).await
}
async fn get_storage_usage(&self) -> crate::core::error::Result<u64> {
self.local_cache.get_storage_usage().await
}
async fn cleanup_storage(&self, target_size_bytes: u64) -> crate::core::error::Result<u64> {
self.local_cache.cleanup_storage(target_size_bytes).await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn clear_indexes(&self) {
self.local_cache.clear_indexes();
}
}