1use std::{
7 future::Future,
8 io,
9 path::PathBuf,
10 sync::{
11 Arc,
12 atomic::{AtomicBool, Ordering},
13 },
14 time::Duration,
15};
16
17use anyhow::{Context, Result, anyhow};
18
19use brume::vfs::VirtualPathBuf;
20use futures::StreamExt;
21use interprocess::local_socket::{
22 GenericNamespaced, ListenerOptions, ToNsName, tokio::Listener, traits::tokio::Listener as _,
23};
24use log::{error, info, warn};
25use tarpc::{
26 serde_transport,
27 server::{BaseChannel, Channel},
28 tokio_serde::formats::Bincode,
29 tokio_util::{
30 codec::{LengthDelimitedCodec, length_delimited::Builder},
31 sync::CancellationToken,
32 },
33};
34use tokio::{
35 sync::{
36 Mutex,
37 mpsc::{UnboundedReceiver, unbounded_channel},
38 },
39 task::JoinHandle,
40 time,
41};
42
43use brume_daemon_proto::{
44 AnySynchroCreationInfo, BRUME_SOCK_NAME, BrumeService, SynchroId, SynchroSide, SynchroState,
45};
46
47use crate::{
48 db::{Database, DatabaseConfig},
49 server::Server,
50 synchro_list::ReadWriteSynchroList,
51};
52
53#[derive(Clone)]
55pub struct DaemonConfig {
56 sync_interval: Duration,
58 error_mode: ErrorMode,
60 sock_name: String,
62 db: DatabaseConfig,
64}
65
66impl Default for DaemonConfig {
67 fn default() -> Self {
68 Self {
69 sync_interval: Duration::from_secs(10),
70 error_mode: ErrorMode::default(),
71 sock_name: BRUME_SOCK_NAME.to_string(),
72 db: DatabaseConfig::OnDisk(PathBuf::from("./dev.db")), }
74 }
75}
76
77impl DaemonConfig {
78 pub fn with_sync_interval(self, sync_interval: Duration) -> Self {
79 Self {
80 sync_interval,
81 ..self
82 }
83 }
84
85 pub fn with_error_mode(self, error_mode: ErrorMode) -> Self {
86 Self { error_mode, ..self }
87 }
88
89 pub fn with_sock_name(self, sock_name: &str) -> Self {
90 Self {
91 sock_name: sock_name.to_string(),
92 ..self
93 }
94 }
95
96 pub fn with_db_config(self, db_config: DatabaseConfig) -> Self {
97 Self {
98 db: db_config,
99 ..self
100 }
101 }
102}
103
104#[derive(Default, Copy, Clone, PartialEq, Eq)]
106pub enum ErrorMode {
107 #[default]
108 Log,
109 Exit,
110}
111
112#[derive(Debug)]
114pub enum UserCommand {
115 SynchroCreation(SynchroCreationRequest),
116 SynchroDeletion(SynchroDeletionRequest),
117 StateChange(StateChangeRequest),
118 ConflictResolution(ConflictResolutionRequest),
119}
120
121#[derive(Debug)]
123pub struct SynchroCreationRequest {
124 info: AnySynchroCreationInfo,
125}
126
127impl SynchroCreationRequest {
128 pub fn new(info: AnySynchroCreationInfo) -> Self {
129 Self { info }
130 }
131}
132
133#[derive(Debug)]
135pub struct SynchroDeletionRequest {
136 id: SynchroId,
137}
138
139impl SynchroDeletionRequest {
140 pub fn new(id: SynchroId) -> Self {
141 Self { id }
142 }
143}
144
145#[derive(Debug)]
147pub struct StateChangeRequest {
148 id: SynchroId,
149 state: SynchroState,
150}
151
152impl StateChangeRequest {
153 pub fn new(id: SynchroId, state: SynchroState) -> Self {
154 Self { id, state }
155 }
156}
157
158#[derive(Debug)]
160pub struct ConflictResolutionRequest {
161 id: SynchroId,
162 path: VirtualPathBuf,
163 side: SynchroSide,
164}
165
166impl ConflictResolutionRequest {
167 pub fn new(id: SynchroId, path: VirtualPathBuf, side: SynchroSide) -> Self {
168 Self { id, path, side }
169 }
170}
171
172pub struct Daemon {
176 codec_builder: Builder,
177 rpc_listener: Listener,
178 synchro_list: ReadWriteSynchroList,
179 server: Server,
180 commands_chan: Mutex<UnboundedReceiver<UserCommand>>,
181 is_running: AtomicBool,
182 cancellation_token: CancellationToken,
183 database: Database,
184 config: DaemonConfig,
185}
186
187impl Daemon {
188 pub async fn new(config: DaemonConfig) -> Result<Self> {
189 let name = config
190 .sock_name
191 .as_str()
192 .to_ns_name::<GenericNamespaced>()
193 .context("Invalid name for sock")?;
194 let opts = ListenerOptions::new().name(name);
195 let listener = match opts.create_tokio() {
196 Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
197 error!(
198 "Error: could not start server because the socket file is occupied. \
199Please check if {} is in use by another process and try again.",
200 config.sock_name
201 );
202 return Err(e).context("Failed to start server");
203 }
204 x => x?,
205 };
206
207 let codec_builder = LengthDelimitedCodec::builder();
208 let (commands_to_daemon, commands_from_server) = unbounded_channel();
209
210 info!("Loading db: {}", config.db.as_str().unwrap());
211 let database = Database::new(&config.db).await?;
212
213 let synchro_list = ReadWriteSynchroList::from(database.load_all_synchros().await.unwrap());
214
215 info!("Server running at {}", config.sock_name);
216 let server = Server::new(commands_to_daemon, synchro_list.as_read_only());
217
218 Ok(Self {
219 codec_builder,
220 rpc_listener: listener,
221 synchro_list,
222 server,
223 commands_chan: Mutex::new(commands_from_server),
224 is_running: AtomicBool::new(false),
225 cancellation_token: CancellationToken::new(),
226 config,
227 database,
228 })
229 }
230
231 pub fn stop(&self) {
233 self.cancellation_token.cancel()
234 }
235
236 pub async fn spawn(self: &Arc<Self>) -> JoinHandle<Result<()>> {
238 let daemon = self.clone();
239 tokio::spawn(async move { daemon.run().await })
240 }
241
242 pub async fn run(self: Arc<Self>) -> Result<()> {
244 if self
245 .is_running
246 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
247 .is_err()
248 {
249 return Err(anyhow!("Daemon is already running"));
250 }
251 {
253 let server = self.clone();
254 tokio::spawn(async move { server.serve().await });
255 }
256
257 let mut interval = time::interval(self.config.sync_interval);
259 interval.tick().await; loop {
262 info!("Starting full sync for all filesystems");
263 let synchro_list = self.synchro_list();
264
265 let results = synchro_list.sync_all(&self.database).await;
266
267 for res in results {
268 if let Err(err) = res {
269 let wrapped_err = anyhow!(err);
270 error!("Failed to synchronize filesystems: {wrapped_err:?}");
271 if self.config.error_mode == ErrorMode::Exit {
272 self.is_running.store(false, Ordering::Relaxed);
273 return Err(wrapped_err);
274 }
275 }
276 }
277
278 loop {
280 tokio::select! {
281 _ = interval.tick() => break,
282 Some(command) = self.recv_user_commands() => {
283 let res = self.handle_user_commands(command).await;
284 res.inspect_err(|_| {
285 self.is_running.store(false, Ordering::Relaxed);
286 })?
287 }
288 _ = self.cancellation_token.cancelled() => {
289 self.is_running.store(false, Ordering::Relaxed);
290 return Ok(())
291 }
292 }
293 }
294 }
295 }
296
297 pub async fn serve(&self) {
299 async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
300 tokio::spawn(fut);
301 }
302
303 loop {
304 let res = tokio::select! {
305 res = self.rpc_listener.accept() => res,
306 _ = self.cancellation_token.cancelled() => {
307 return
308 }
309 };
310
311 let conn = match res {
312 Ok(c) => c,
313 Err(e) => {
314 warn!("There was an error with an incoming connection: {e}");
315 continue;
316 }
317 };
318
319 let transport =
320 serde_transport::new(self.codec_builder.new_framed(conn), Bincode::default());
321
322 let fut = BaseChannel::with_defaults(transport)
323 .execute(self.server.clone().serve())
324 .for_each(spawn);
325
326 let token = self.cancellation_token.child_token();
327 tokio::spawn(async move {
328 tokio::select! {
329 _ = fut => {},
330 _ = token.cancelled() => {}
331 }
332 });
333 }
334 }
335
336 pub fn is_running(&self) -> bool {
338 self.is_running.load(Ordering::Relaxed)
339 }
340
341 pub fn synchro_list(&self) -> ReadWriteSynchroList {
343 self.synchro_list.clone()
344 }
345
346 pub async fn recv_user_commands(&self) -> Option<UserCommand> {
348 let mut receiver = self.commands_chan.lock().await;
349 receiver.recv().await
350 }
351
352 pub async fn create_synchro(&self, synchro: SynchroCreationRequest) -> Result<()> {
354 let info = synchro.info;
355 let created = self.synchro_list.insert(info.clone()).await?;
356
357 self.database.insert_synchro(created, info).await?;
358
359 Ok(())
360 }
361
362 pub async fn delete_synchro(&self, synchro: SynchroDeletionRequest) -> Result<()> {
364 self.synchro_list.remove(synchro.id).await?;
365 self.database.delete_synchro(synchro.id).await?;
366
367 Ok(())
368 }
369
370 pub async fn update_synchro_state(&self, state_request: StateChangeRequest) -> Result<()> {
371 self.synchro_list
372 .read()
373 .await
374 .set_state(state_request.id, state_request.state)
375 .await?;
376 self.database
377 .set_synchro_state(state_request.id, state_request.state)
378 .await?;
379
380 Ok(())
381 }
382
383 pub async fn handle_user_commands(&self, command: UserCommand) -> Result<()> {
385 match command {
386 UserCommand::SynchroCreation(new_synchro) => {
387 if let Err(err) = self.create_synchro(new_synchro).await {
388 let wrapped_err = anyhow!(err);
389 error!("Failed to insert new synchro: {wrapped_err:?}");
390 if self.config.error_mode == ErrorMode::Exit {
391 return Err(wrapped_err);
392 }
393 }
394 }
395
396 UserCommand::SynchroDeletion(to_delete) => {
397 if let Err(err) = self.delete_synchro(to_delete).await {
398 let wrapped_err = anyhow!(err);
399 error!("Failed to delete synchro: {wrapped_err:?}");
400 if self.config.error_mode == ErrorMode::Exit {
401 return Err(wrapped_err);
402 }
403 }
404 }
405 UserCommand::StateChange(state_request) => {
406 if let Err(err) = self.update_synchro_state(state_request).await {
407 let wrapped_err = anyhow!(err);
408 error!("Failed to set synchro state: {wrapped_err:?}");
409 if self.config.error_mode == ErrorMode::Exit {
410 return Err(wrapped_err);
411 }
412 }
413 }
414 UserCommand::ConflictResolution(conflict) => {
415 let res = self
418 .synchro_list
419 .resolve_conflict(conflict.id, &conflict.path, conflict.side, &self.database)
420 .await;
421 if let Err(err) = res {
422 let wrapped_err = anyhow!(err);
423 error!("Failed resolve conflict: {wrapped_err:?}");
424 if self.config.error_mode == ErrorMode::Exit {
425 return Err(wrapped_err);
426 }
427 }
428 }
429 }
430
431 Ok(())
432 }
433}