use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use async_trait::async_trait;
use citadel_crypt::ratchets::mono::MonoRatchet;
use citadel_crypt::ratchets::stacked::StackedRatchet;
use citadel_crypt::ratchets::Ratchet;
#[cfg(all(feature = "redis", not(coverage)))]
use crate::backend::redis_backend::RedisConnectionOptions;
#[cfg(all(feature = "sql", not(coverage)))]
use crate::backend::sql_backend::SqlConnectionOptions;
use crate::client_account::ClientNetworkAccount;
use crate::misc::{AccountError, CNACMetadata};
use citadel_crypt::scramble::streaming_crypt_scrambler::ObjectSource;
use citadel_io::tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use citadel_types::proto::{ObjectTransferStatus, VirtualObjectMetadata};
use citadel_types::user;
use citadel_types::user::MutualPeer;
pub mod file_io;
#[cfg(any(feature = "filesystem", feature = "opfs"))]
pub mod file_io_backend;
pub mod memory;
#[cfg(feature = "opfs")]
pub mod opfs_file_io;
#[cfg(all(feature = "redis", not(coverage)))]
pub mod redis_backend;
#[cfg(all(feature = "sql", not(coverage)))]
pub mod sql_backend;
#[cfg(all(feature = "filesystem", not(target_family = "wasm")))]
pub mod std_file_io;
#[allow(missing_docs)]
pub mod utils;
#[derive(Clone, Debug, Eq, PartialEq)]
#[allow(variant_size_differences)]
pub enum BackendType {
InMemory,
#[cfg(all(feature = "filesystem", not(target_family = "wasm")))]
Filesystem(String),
#[cfg(feature = "opfs")]
Opfs(String),
#[cfg(all(feature = "sql", not(coverage)))]
SQLDatabase(String, SqlConnectionOptions),
#[cfg(all(feature = "redis", not(coverage)))]
Redis(String, RedisConnectionOptions),
}
impl Default for BackendType {
fn default() -> Self {
#[cfg(all(feature = "filesystem", not(target_family = "wasm")))]
{
let mut home_dir = dirs2::home_dir().unwrap();
home_dir.push(format!(".citadel/{}", uuid::Uuid::new_v4().as_u128()));
return BackendType::Filesystem(home_dir.to_str().unwrap().to_string());
}
#[allow(unreachable_code)]
BackendType::InMemory
}
}
impl BackendType {
pub fn is_filesystem_backend(&self) -> bool {
#[cfg(all(feature = "filesystem", not(target_family = "wasm")))]
{
matches!(self, BackendType::Filesystem(..))
}
#[cfg(not(all(feature = "filesystem", not(target_family = "wasm"))))]
{
false
}
}
pub fn new<T: Into<String>>(url: T) -> Result<Self, AccountError> {
let addr = url.into();
#[cfg(all(feature = "redis", not(coverage)))]
{
if addr.starts_with("redis") {
return Ok(BackendType::redis(addr));
}
}
#[cfg(all(feature = "sql", not(coverage)))]
{
if addr.starts_with("mysql")
|| addr.starts_with("postgres")
|| addr.starts_with("sqlite")
{
return Ok(BackendType::sql(addr));
}
}
#[cfg(all(feature = "filesystem", not(target_family = "wasm")))]
{
if addr.starts_with("file:") {
return Ok(Self::filesystem(addr));
}
}
#[cfg(feature = "opfs")]
{
if addr.starts_with("opfs://") {
return Ok(Self::opfs(addr));
}
}
Err(citadel_io::error!(
citadel_io::ErrorCode::BackendTargetInvalid,
addr.to_string()
))
}
#[cfg(all(feature = "filesystem", not(target_family = "wasm")))]
pub fn filesystem<T: Into<String>>(path: T) -> Self {
Self::Filesystem(path.into().replace("file:", ""))
}
#[cfg(feature = "opfs")]
pub fn opfs<T: Into<String>>(path: T) -> Self {
Self::Opfs(path.into().replace("opfs://", ""))
}
#[cfg(all(feature = "redis", not(coverage)))]
pub fn redis<T: Into<String>>(url: T) -> BackendType {
Self::redis_with(url, Default::default())
}
#[cfg(all(feature = "redis", not(coverage)))]
pub fn redis_with<T: Into<String>>(url: T, opts: RedisConnectionOptions) -> BackendType {
BackendType::Redis(url.into(), opts)
}
#[cfg(all(feature = "sql", not(coverage)))]
pub fn sql<T: Into<String>>(url: T) -> BackendType {
BackendType::SQLDatabase(url.into(), Default::default())
}
#[cfg(all(feature = "sql", not(coverage)))]
pub fn sql_with<T: Into<String>>(url: T, opts: SqlConnectionOptions) -> BackendType {
BackendType::SQLDatabase(url.into(), opts)
}
}
#[async_trait]
pub trait BackendConnection<R: Ratchet, Fcm: Ratchet>: Send + Sync {
async fn connect(&mut self) -> Result<(), AccountError>;
async fn is_connected(&self) -> Result<bool, AccountError>;
async fn save_cnac(&self, cnac: &ClientNetworkAccount<R, Fcm>) -> Result<(), AccountError>;
async fn get_cnac_by_cid(
&self,
cid: u64,
) -> Result<Option<ClientNetworkAccount<R, Fcm>>, AccountError>;
async fn get_client_by_username(
&self,
username: &str,
) -> Result<Option<ClientNetworkAccount<R, Fcm>>, AccountError> {
self.get_cnac_by_cid(user::username_to_cid(username)).await
}
async fn cid_is_registered(&self, cid: u64) -> Result<bool, AccountError>;
async fn delete_cnac_by_cid(&self, cid: u64) -> Result<(), AccountError>;
async fn purge(&self) -> Result<usize, AccountError>;
async fn username_exists(&self, username: &str) -> Result<bool, AccountError> {
self.cid_is_registered(user::username_to_cid(username))
.await
}
async fn get_registered_impersonal_cids(
&self,
limit: Option<i32>,
) -> Result<Option<Vec<u64>>, AccountError>;
async fn get_username_by_cid(&self, cid: u64) -> Result<Option<String>, AccountError>;
async fn get_full_name_by_cid(&self, cid: u64) -> Result<Option<String>, AccountError>;
fn get_cid_by_username(&self, username: &str) -> u64 {
user::username_to_cid(username)
}
async fn register_p2p_as_server(&self, cid0: u64, cid1: u64) -> Result<(), AccountError>;
async fn register_p2p_as_client(
&self,
session_cid: u64,
peer_cid: u64,
peer_username: String,
) -> Result<(), AccountError>;
async fn deregister_p2p_as_server(&self, cid0: u64, cid1: u64) -> Result<(), AccountError>;
async fn deregister_p2p_as_client(
&self,
session_cid: u64,
peer_cid: u64,
) -> Result<Option<MutualPeer>, AccountError>;
async fn get_hyperlan_peer_list(
&self,
session_cid: u64,
) -> Result<Option<Vec<u64>>, AccountError>;
async fn get_client_metadata(
&self,
session_cid: u64,
) -> Result<Option<CNACMetadata>, AccountError>;
async fn get_clients_metadata(
&self,
limit: Option<i32>,
) -> Result<Vec<CNACMetadata>, AccountError>;
async fn get_hyperlan_peer_by_cid(
&self,
session_cid: u64,
peer_cid: u64,
) -> Result<Option<MutualPeer>, AccountError>;
async fn hyperlan_peer_exists(
&self,
session_cid: u64,
peer_cid: u64,
) -> Result<bool, AccountError>;
async fn hyperlan_peers_are_mutuals(
&self,
session_cid: u64,
peers: &[u64],
) -> Result<Vec<bool>, AccountError>;
async fn get_hyperlan_peers(
&self,
session_cid: u64,
peers: &[u64],
) -> Result<Vec<MutualPeer>, AccountError>;
async fn get_hyperlan_peer_by_username(
&self,
session_cid: u64,
username: &str,
) -> Result<Option<MutualPeer>, AccountError> {
self.get_hyperlan_peer_by_cid(session_cid, user::username_to_cid(username))
.await
}
async fn get_hyperlan_peer_list_as_server(
&self,
session_cid: u64,
) -> Result<Option<Vec<MutualPeer>>, AccountError>;
async fn synchronize_hyperlan_peer_list_as_client(
&self,
cnac: &ClientNetworkAccount<R, Fcm>,
peers: Vec<MutualPeer>,
) -> Result<(), AccountError>;
async fn get_byte_map_value(
&self,
session_cid: u64,
peer_cid: u64,
key: &str,
sub_key: &str,
) -> Result<Option<Vec<u8>>, AccountError>;
async fn remove_byte_map_value(
&self,
session_cid: u64,
peer_cid: u64,
key: &str,
sub_key: &str,
) -> Result<Option<Vec<u8>>, AccountError>;
async fn store_byte_map_value(
&self,
session_cid: u64,
peer_cid: u64,
key: &str,
sub_key: &str,
value: Vec<u8>,
) -> Result<Option<Vec<u8>>, AccountError>;
async fn get_byte_map_values_by_key(
&self,
session_cid: u64,
peer_cid: u64,
key: &str,
) -> Result<HashMap<String, Vec<u8>>, AccountError>;
async fn remove_byte_map_values_by_key(
&self,
session_cid: u64,
peer_cid: u64,
key: &str,
) -> Result<HashMap<String, Vec<u8>>, AccountError>;
async fn stream_object_to_backend(
&self,
source: UnboundedReceiver<Vec<u8>>,
sink_metadata: &VirtualObjectMetadata,
status_tx: UnboundedSender<ObjectTransferStatus>,
) -> Result<(), AccountError>;
#[allow(unused_variables)]
async fn revfs_get_file_info(
&self,
cid: u64,
virtual_path: std::path::PathBuf,
) -> Result<(Box<dyn ObjectSource>, VirtualObjectMetadata), AccountError> {
Err(citadel_io::error!(citadel_io::ErrorCode::RevfsUnsupported))
}
#[allow(unused_variables)]
async fn revfs_delete(
&self,
cid: u64,
virtual_path: std::path::PathBuf,
) -> Result<(), AccountError> {
Err(citadel_io::error!(citadel_io::ErrorCode::RevfsUnsupported))
}
}
pub struct PersistenceHandler<R: Ratchet = StackedRatchet, Fcm: Ratchet = MonoRatchet> {
inner: Arc<dyn BackendConnection<R, Fcm>>,
}
impl<R: Ratchet, Fcm: Ratchet> PersistenceHandler<R, Fcm> {
pub async fn create<T: BackendConnection<R, Fcm> + 'static>(
mut inner: T,
) -> Result<Self, AccountError> {
inner.connect().await?;
Ok(Self {
inner: Arc::new(inner),
})
}
}
impl<R: Ratchet, Fcm: Ratchet> Deref for PersistenceHandler<R, Fcm> {
type Target = Arc<dyn BackendConnection<R, Fcm>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<R: Ratchet, Fcm: Ratchet> Clone for PersistenceHandler<R, Fcm> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}