jax_daemon/
service_state.rs1#[cfg(feature = "fuse")]
2use std::sync::Arc;
3
4#[cfg(feature = "fuse")]
5use tokio::sync::RwLock;
6use url::Url;
7
8use crate::blobs::{Blobs, BlobsSetupError};
9use crate::database::{Database, DatabaseSetupError};
10#[cfg(feature = "fuse")]
11use crate::fuse::{MountManager, MountManagerConfig};
12use crate::service_config::Config;
13use crate::sync_provider::{QueuedSyncConfig, QueuedSyncProvider};
14
15use common::crypto::SecretKey;
16use common::peer::{Peer, PeerBuilder};
17
18#[derive(Clone)]
20pub struct State {
21 database: Database,
22 peer: Peer<Database>,
23 #[cfg(feature = "fuse")]
24 mount_manager: Arc<RwLock<Option<MountManager>>>,
25}
26
27impl State {
28 pub async fn from_config(config: &Config) -> Result<Self, StateSetupError> {
29 let sqlite_database_url = match config.sqlite_path {
31 Some(ref path) => {
32 if !path.exists() {
34 return Err(StateSetupError::DatabasePathDoesNotExist);
35 }
36 Url::parse(&format!("sqlite://{}", path.display()))
38 .map_err(|_| StateSetupError::InvalidDatabaseUrl)
39 }
40 None => Url::parse("sqlite::memory:").map_err(|_| StateSetupError::InvalidDatabaseUrl),
42 }?;
43 tracing::info!("Database URL: {:?}", sqlite_database_url);
44 let database = Database::connect(&sqlite_database_url).await?;
45
46 let node_secret = config
48 .node_secret
49 .clone()
50 .unwrap_or_else(SecretKey::generate);
51
52 tracing::debug!("ServiceState::from_config - loading blobs store");
54 let blobs = Blobs::setup(&config.blob_store, &config.jax_dir).await?;
55 tracing::debug!("ServiceState::from_config - blobs store loaded successfully");
56
57 let (sync_provider, job_receiver) = QueuedSyncProvider::new(QueuedSyncConfig::default());
62
63 let mut peer_builder = PeerBuilder::new()
64 .with_sync_provider(std::sync::Arc::new(sync_provider))
65 .log_provider(database.clone())
66 .blobs_store(blobs.into_inner())
67 .secret_key(node_secret.clone());
68
69 if let Some(addr) = config.node_listen_addr {
70 peer_builder = peer_builder.socket_address(addr);
71 }
72
73 let peer = peer_builder.build().await;
74
75 let bound_addrs = peer.endpoint().bound_sockets();
77 tracing::info!("Node id: {} (with JAX protocol)", peer.id());
78 tracing::info!("Peer listening on: {:?}", bound_addrs);
79
80 let peer_for_worker = peer.clone();
83 let job_stream = job_receiver.into_async();
84 tokio::spawn(async move {
85 crate::sync_provider::run_worker(peer_for_worker, job_stream).await;
86 });
87
88 let state = Self {
90 database: database.clone(),
91 peer: peer.clone(),
92 #[cfg(feature = "fuse")]
93 mount_manager: Arc::new(RwLock::new(None)),
94 };
95
96 #[cfg(feature = "fuse")]
98 {
99 let mount_manager = MountManager::new(database, peer, MountManagerConfig::default());
100 *state.mount_manager.write().await = Some(mount_manager);
101 }
102
103 Ok(state)
104 }
105
106 pub fn peer(&self) -> &Peer<Database> {
107 &self.peer
108 }
109
110 pub fn node(&self) -> &Peer<Database> {
111 &self.peer
113 }
114
115 pub fn database(&self) -> &Database {
116 &self.database
117 }
118
119 #[cfg(feature = "fuse")]
121 pub fn mount_manager(&self) -> &Arc<RwLock<Option<MountManager>>> {
122 &self.mount_manager
123 }
124}
125
126impl AsRef<Peer<Database>> for State {
127 fn as_ref(&self) -> &Peer<Database> {
128 &self.peer
129 }
130}
131
132impl AsRef<Database> for State {
133 fn as_ref(&self) -> &Database {
134 self.database()
135 }
136}
137
138#[derive(Debug, thiserror::Error)]
139pub enum StateSetupError {
140 #[error("Database path does not exist")]
141 DatabasePathDoesNotExist,
142 #[error("Database setup error")]
143 DatabaseSetupError(#[from] DatabaseSetupError),
144 #[error("Invalid database URL")]
145 InvalidDatabaseUrl,
146 #[error("Blobs setup error: {0}")]
147 BlobsSetupError(#[from] BlobsSetupError),
148}