jax_daemon/
service_state.rs1#[cfg(feature = "fuse")]
2use std::sync::Arc;
3
4#[cfg(feature = "fuse")]
5use tokio::sync::RwLock;
6use url::Url;
7
8use crate::blobs::{Blobs, BlobsSetupError};
9use crate::database::{Database, DatabaseSetupError};
10#[cfg(feature = "fuse")]
11use crate::fuse::{MountManager, MountManagerConfig};
12use crate::service_config::Config;
13use crate::sync_provider::{QueuedSyncConfig, QueuedSyncProvider};
14
15use common::crypto::SecretKey;
16use common::peer::{Peer, PeerBuilder};
17
18#[derive(Clone)]
20pub struct State {
21 database: Database,
22 peer: Peer<Database>,
23 #[cfg(feature = "fuse")]
24 mount_manager: Arc<RwLock<Option<MountManager>>>,
25}
26
27impl State {
28 pub async fn from_config(config: &Config) -> Result<Self, StateSetupError> {
29 let sqlite_database_url = match config.sqlite_path {
31 Some(ref path) => {
32 if !path.exists() {
34 return Err(StateSetupError::DatabasePathDoesNotExist);
35 }
36 Url::parse(&format!("sqlite://{}", path.display()))
38 .map_err(|_| StateSetupError::InvalidDatabaseUrl)
39 }
40 None => Url::parse("sqlite::memory:").map_err(|_| StateSetupError::InvalidDatabaseUrl),
42 }?;
43 tracing::info!("Database URL: {:?}", sqlite_database_url);
44 let database = Database::connect(&sqlite_database_url).await?;
45
46 let node_secret = config
48 .node_secret
49 .clone()
50 .unwrap_or_else(SecretKey::generate);
51
52 tracing::debug!("ServiceState::from_config - loading blobs store");
54 let blobs =
55 Blobs::setup(&config.blob_store, &config.jax_dir, config.max_import_size).await?;
56 tracing::debug!("ServiceState::from_config - blobs store loaded successfully");
57
58 let (sync_provider, job_receiver) = QueuedSyncProvider::new(QueuedSyncConfig::default());
63
64 let mut peer_builder = PeerBuilder::new()
65 .with_sync_provider(std::sync::Arc::new(sync_provider))
66 .log_provider(database.clone())
67 .blobs_store(blobs.into_inner())
68 .secret_key(node_secret.clone());
69
70 if let Some(addr) = config.node_listen_addr {
71 peer_builder = peer_builder.socket_address(addr);
72 }
73
74 let peer = peer_builder.build().await;
75
76 let bound_addrs = peer.endpoint().bound_sockets();
78 tracing::info!("Node id: {} (with JAX protocol)", peer.id());
79 tracing::info!("Peer listening on: {:?}", bound_addrs);
80
81 let peer_for_worker = peer.clone();
84 let job_stream = job_receiver.into_async();
85 tokio::spawn(async move {
86 crate::sync_provider::run_worker(peer_for_worker, job_stream).await;
87 });
88
89 let state = Self {
91 database: database.clone(),
92 peer: peer.clone(),
93 #[cfg(feature = "fuse")]
94 mount_manager: Arc::new(RwLock::new(None)),
95 };
96
97 #[cfg(feature = "fuse")]
99 {
100 let mount_manager = MountManager::new(
101 database,
102 peer,
103 MountManagerConfig {
104 api_port: config.api_port,
105 ..MountManagerConfig::default()
106 },
107 );
108 *state.mount_manager.write().await = Some(mount_manager);
109 }
110
111 Ok(state)
112 }
113
114 pub fn peer(&self) -> &Peer<Database> {
115 &self.peer
116 }
117
118 pub fn node(&self) -> &Peer<Database> {
119 &self.peer
121 }
122
123 pub fn database(&self) -> &Database {
124 &self.database
125 }
126
127 #[cfg(feature = "fuse")]
129 pub fn mount_manager(&self) -> &Arc<RwLock<Option<MountManager>>> {
130 &self.mount_manager
131 }
132}
133
134impl AsRef<Peer<Database>> for State {
135 fn as_ref(&self) -> &Peer<Database> {
136 &self.peer
137 }
138}
139
140impl AsRef<Database> for State {
141 fn as_ref(&self) -> &Database {
142 self.database()
143 }
144}
145
146#[derive(Debug, thiserror::Error)]
147pub enum StateSetupError {
148 #[error("Database path does not exist")]
149 DatabasePathDoesNotExist,
150 #[error("Database setup error")]
151 DatabaseSetupError(#[from] DatabaseSetupError),
152 #[error("Invalid database URL")]
153 InvalidDatabaseUrl,
154 #[error("Blobs setup error: {0}")]
155 BlobsSetupError(#[from] BlobsSetupError),
156}