#[cfg(feature = "fuse")]
use std::sync::Arc;
#[cfg(feature = "fuse")]
use tokio::sync::RwLock;
use url::Url;
use crate::blobs::{Blobs, BlobsSetupError};
use crate::database::{Database, DatabaseSetupError};
#[cfg(feature = "fuse")]
use crate::fuse::{MountManager, MountManagerConfig};
use crate::service_config::Config;
use crate::sync_provider::{QueuedSyncConfig, QueuedSyncProvider};
use common::crypto::SecretKey;
use common::peer::{Peer, PeerBuilder};
#[derive(Clone)]
pub struct State {
database: Database,
peer: Peer<Database>,
#[cfg(feature = "fuse")]
mount_manager: Arc<RwLock<Option<MountManager>>>,
}
impl State {
pub async fn from_config(config: &Config) -> Result<Self, StateSetupError> {
let sqlite_database_url = match config.sqlite_path {
Some(ref path) => {
if !path.exists() {
return Err(StateSetupError::DatabasePathDoesNotExist);
}
Url::parse(&format!("sqlite://{}", path.display()))
.map_err(|_| StateSetupError::InvalidDatabaseUrl)
}
None => Url::parse("sqlite::memory:").map_err(|_| StateSetupError::InvalidDatabaseUrl),
}?;
tracing::info!("Database URL: {:?}", sqlite_database_url);
let database = Database::connect(&sqlite_database_url).await?;
let node_secret = config
.node_secret
.clone()
.unwrap_or_else(SecretKey::generate);
tracing::debug!("ServiceState::from_config - loading blobs store");
let blobs =
Blobs::setup(&config.blob_store, &config.jax_dir, config.max_import_size).await?;
tracing::debug!("ServiceState::from_config - blobs store loaded successfully");
let (sync_provider, job_receiver) = QueuedSyncProvider::new(QueuedSyncConfig::default());
let mut peer_builder = PeerBuilder::new()
.with_sync_provider(std::sync::Arc::new(sync_provider))
.log_provider(database.clone())
.blobs_store(blobs.into_inner())
.secret_key(node_secret.clone());
if let Some(addr) = config.node_listen_addr {
peer_builder = peer_builder.socket_address(addr);
}
let peer = peer_builder.build().await;
let bound_addrs = peer.endpoint().bound_sockets();
tracing::info!("Node id: {} (with JAX protocol)", peer.id());
tracing::info!("Peer listening on: {:?}", bound_addrs);
let peer_for_worker = peer.clone();
let job_stream = job_receiver.into_async();
tokio::spawn(async move {
crate::sync_provider::run_worker(peer_for_worker, job_stream).await;
});
let state = Self {
database: database.clone(),
peer: peer.clone(),
#[cfg(feature = "fuse")]
mount_manager: Arc::new(RwLock::new(None)),
};
#[cfg(feature = "fuse")]
{
let mount_manager = MountManager::new(
database,
peer,
MountManagerConfig {
api_port: config.api_port,
..MountManagerConfig::default()
},
);
*state.mount_manager.write().await = Some(mount_manager);
}
Ok(state)
}
pub fn peer(&self) -> &Peer<Database> {
&self.peer
}
pub fn node(&self) -> &Peer<Database> {
&self.peer
}
pub fn database(&self) -> &Database {
&self.database
}
#[cfg(feature = "fuse")]
pub fn mount_manager(&self) -> &Arc<RwLock<Option<MountManager>>> {
&self.mount_manager
}
}
impl AsRef<Peer<Database>> for State {
fn as_ref(&self) -> &Peer<Database> {
&self.peer
}
}
impl AsRef<Database> for State {
fn as_ref(&self) -> &Database {
self.database()
}
}
#[derive(Debug, thiserror::Error)]
pub enum StateSetupError {
#[error("Database path does not exist")]
DatabasePathDoesNotExist,
#[error("Database setup error")]
DatabaseSetupError(#[from] DatabaseSetupError),
#[error("Invalid database URL")]
InvalidDatabaseUrl,
#[error("Blobs setup error: {0}")]
BlobsSetupError(#[from] BlobsSetupError),
}