Skip to main content

jax_daemon/
service_state.rs

1#[cfg(feature = "fuse")]
2use std::sync::Arc;
3
4#[cfg(feature = "fuse")]
5use tokio::sync::RwLock;
6use url::Url;
7
8use crate::blobs::{Blobs, BlobsSetupError};
9use crate::database::{Database, DatabaseSetupError};
10#[cfg(feature = "fuse")]
11use crate::fuse::{MountManager, MountManagerConfig};
12use crate::service_config::Config;
13use crate::sync_provider::{QueuedSyncConfig, QueuedSyncProvider};
14
15use common::crypto::SecretKey;
16use common::peer::{Peer, PeerBuilder};
17
18/// Main service state - orchestrates all components
19#[derive(Clone)]
20pub struct State {
21    database: Database,
22    peer: Peer<Database>,
23    #[cfg(feature = "fuse")]
24    mount_manager: Arc<RwLock<Option<MountManager>>>,
25}
26
27impl State {
28    pub async fn from_config(config: &Config) -> Result<Self, StateSetupError> {
29        // 1. Setup database
30        let sqlite_database_url = match config.sqlite_path {
31            Some(ref path) => {
32                // check that the path exists
33                if !path.exists() {
34                    return Err(StateSetupError::DatabasePathDoesNotExist);
35                }
36                // parse the path into a URL
37                Url::parse(&format!("sqlite://{}", path.display()))
38                    .map_err(|_| StateSetupError::InvalidDatabaseUrl)
39            }
40            // otherwise just set up an in-memory database
41            None => Url::parse("sqlite::memory:").map_err(|_| StateSetupError::InvalidDatabaseUrl),
42        }?;
43        tracing::info!("Database URL: {:?}", sqlite_database_url);
44        let database = Database::connect(&sqlite_database_url).await?;
45
46        // 2. Setup node secret
47        let node_secret = config
48            .node_secret
49            .clone()
50            .unwrap_or_else(SecretKey::generate);
51
52        // 3. Setup blobs store using the new blobs module
53        tracing::debug!("ServiceState::from_config - loading blobs store");
54        let blobs =
55            Blobs::setup(&config.blob_store, &config.jax_dir, config.max_import_size).await?;
56        tracing::debug!("ServiceState::from_config - blobs store loaded successfully");
57
58        // 4. Build peer from the database as the log provider
59        // TODO: Make queue size configurable via config
60
61        // Create sync provider with worker
62        let (sync_provider, job_receiver) = QueuedSyncProvider::new(QueuedSyncConfig::default());
63
64        let mut peer_builder = PeerBuilder::new()
65            .with_sync_provider(std::sync::Arc::new(sync_provider))
66            .log_provider(database.clone())
67            .blobs_store(blobs.into_inner())
68            .secret_key(node_secret.clone());
69
70        if let Some(addr) = config.node_listen_addr {
71            peer_builder = peer_builder.socket_address(addr);
72        }
73
74        let peer = peer_builder.build().await;
75
76        // Log the bound addresses
77        let bound_addrs = peer.endpoint().bound_sockets();
78        tracing::info!("Node id: {} (with JAX protocol)", peer.id());
79        tracing::info!("Peer listening on: {:?}", bound_addrs);
80
81        // Spawn the worker for the queued sync provider
82        // The worker is managed outside the peer, like the database
83        let peer_for_worker = peer.clone();
84        let job_stream = job_receiver.into_async();
85        tokio::spawn(async move {
86            crate::sync_provider::run_worker(peer_for_worker, job_stream).await;
87        });
88
89        // Create the initial state
90        let state = Self {
91            database: database.clone(),
92            peer: peer.clone(),
93            #[cfg(feature = "fuse")]
94            mount_manager: Arc::new(RwLock::new(None)),
95        };
96
97        // Initialize mount manager with fuse feature
98        #[cfg(feature = "fuse")]
99        {
100            let mount_manager = MountManager::new(
101                database,
102                peer,
103                MountManagerConfig {
104                    api_port: config.api_port,
105                    ..MountManagerConfig::default()
106                },
107            );
108            *state.mount_manager.write().await = Some(mount_manager);
109        }
110
111        Ok(state)
112    }
113
114    pub fn peer(&self) -> &Peer<Database> {
115        &self.peer
116    }
117
118    pub fn node(&self) -> &Peer<Database> {
119        // Alias for backwards compatibility
120        &self.peer
121    }
122
123    pub fn database(&self) -> &Database {
124        &self.database
125    }
126
127    /// Get the mount manager (only available with fuse feature)
128    #[cfg(feature = "fuse")]
129    pub fn mount_manager(&self) -> &Arc<RwLock<Option<MountManager>>> {
130        &self.mount_manager
131    }
132}
133
134impl AsRef<Peer<Database>> for State {
135    fn as_ref(&self) -> &Peer<Database> {
136        &self.peer
137    }
138}
139
140impl AsRef<Database> for State {
141    fn as_ref(&self) -> &Database {
142        self.database()
143    }
144}
145
146#[derive(Debug, thiserror::Error)]
147pub enum StateSetupError {
148    #[error("Database path does not exist")]
149    DatabasePathDoesNotExist,
150    #[error("Database setup error")]
151    DatabaseSetupError(#[from] DatabaseSetupError),
152    #[error("Invalid database URL")]
153    InvalidDatabaseUrl,
154    #[error("Blobs setup error: {0}")]
155    BlobsSetupError(#[from] BlobsSetupError),
156}