1use crate::account_loader::load_cnac_files;
31use crate::backend::memory::MemoryBackend;
32use crate::backend::BackendConnection;
33use crate::client_account::ClientNetworkAccount;
34use crate::directory_store::DirectoryStore;
35use crate::misc::{AccountError, CNACMetadata};
36use crate::prelude::CNAC_SERIALIZED_EXTENSION;
37use crate::serialization::SyncIO;
38use async_trait::async_trait;
39use citadel_crypt::ratchets::Ratchet;
40use citadel_crypt::scramble::crypt_splitter::MAX_BYTES_PER_GROUP;
41use citadel_crypt::scramble::streaming_crypt_scrambler::ObjectSource;
42use citadel_io::tokio;
43use citadel_io::tokio_stream::StreamExt;
44use citadel_types::proto::{ObjectTransferStatus, TransferType, VirtualObjectMetadata};
45use citadel_types::user::MutualPeer;
46use std::collections::HashMap;
47use std::path::{Path, PathBuf};
48use tokio::io::AsyncWriteExt;
49use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
50
51pub struct FilesystemBackend<R: Ratchet, Fcm: Ratchet> {
64 memory_backend: MemoryBackend<R, Fcm>,
65 directory_store: Option<DirectoryStore>,
66 home_dir: String,
67}
68
69#[async_trait]
70impl<R: Ratchet, Fcm: Ratchet> BackendConnection<R, Fcm> for FilesystemBackend<R, Fcm> {
71 async fn connect(&mut self) -> Result<(), AccountError> {
75 let directory_store = crate::directory_store::setup_directories(self.home_dir.clone())?;
76 let map = load_cnac_files(&directory_store)?;
77 *self.memory_backend.clients.get_mut() = map;
79 self.directory_store = Some(directory_store);
80
81 Ok(())
82 }
83
84 async fn is_connected(&self) -> Result<bool, AccountError> {
88 Ok(true)
89 }
90
91 #[allow(unused_results)]
95 async fn save_cnac(&self, cnac: &ClientNetworkAccount<R, Fcm>) -> Result<(), AccountError> {
96 let bytes = cnac.generate_proper_bytes()?;
98 let cid = cnac.get_cid();
99 let path = self.generate_cnac_local_save_path(cid, cnac.is_personal());
100 std::fs::write(path, bytes).map_err(|err| AccountError::Generic(err.to_string()))?;
102 self.memory_backend.save_cnac(cnac).await
103 }
104
105 async fn get_cnac_by_cid(
109 &self,
110 cid: u64,
111 ) -> Result<Option<ClientNetworkAccount<R, Fcm>>, AccountError> {
112 self.memory_backend.get_cnac_by_cid(cid).await
113 }
114
115 async fn cid_is_registered(&self, cid: u64) -> Result<bool, AccountError> {
119 self.memory_backend.cid_is_registered(cid).await
120 }
121
122 async fn delete_cnac_by_cid(&self, cid: u64) -> Result<(), AccountError> {
126 let is_personal = self
127 .memory_backend
128 .clients
129 .read()
130 .get(&cid)
131 .ok_or(AccountError::ClientNonExists(cid))?
132 .is_personal();
133 self.memory_backend.delete_cnac_by_cid(cid).await?;
134 let path = self.generate_cnac_local_save_path(cid, is_personal);
135 std::fs::remove_file(path).map_err(|err| AccountError::Generic(err.to_string()))
136 }
137
138 async fn purge(&self) -> Result<usize, AccountError> {
142 let paths = {
143 let mut write = self.memory_backend.clients.write();
144 write
145 .drain()
146 .map(|(cid, cnac)| self.generate_cnac_local_save_path(cid, cnac.is_personal()))
147 .collect::<Vec<PathBuf>>()
148 };
149
150 let count = paths.len();
151
152 for path in paths {
153 tokio::fs::remove_file(path)
154 .await
155 .map_err(|err| AccountError::Generic(err.to_string()))?;
156 }
157
158 let home_dir = self.directory_store.as_ref().unwrap().home.as_str();
160 tokio::fs::remove_dir_all(home_dir)
161 .await
162 .map_err(|err| AccountError::Generic(err.to_string()))?;
163
164 Ok(count)
165 }
166
167 async fn get_registered_impersonal_cids(
171 &self,
172 limit: Option<i32>,
173 ) -> Result<Option<Vec<u64>>, AccountError> {
174 self.memory_backend
175 .get_registered_impersonal_cids(limit)
176 .await
177 }
178
179 async fn get_username_by_cid(&self, cid: u64) -> Result<Option<String>, AccountError> {
183 self.memory_backend.get_username_by_cid(cid).await
184 }
185
186 async fn get_full_name_by_cid(&self, cid: u64) -> Result<Option<String>, AccountError> {
190 self.memory_backend.get_full_name_by_cid(cid).await
191 }
192
193 async fn register_p2p_as_server(&self, cid0: u64, cid1: u64) -> Result<(), AccountError> {
197 self.memory_backend
198 .register_p2p_as_server(cid0, cid1)
199 .await?;
200 let (cnac0, cnac1) = {
201 let read = self.memory_backend.clients.read();
202 let cnac0 = read
203 .get(&cid0)
204 .cloned()
205 .ok_or(AccountError::ClientNonExists(cid0))?;
206 let cnac1 = read
207 .get(&cid1)
208 .cloned()
209 .ok_or(AccountError::ClientNonExists(cid1))?;
210 (cnac0, cnac1)
211 };
212
213 self.save_cnac(&cnac0).await?;
214 self.save_cnac(&cnac1).await
215 }
216
217 async fn register_p2p_as_client(
221 &self,
222 session_cid: u64,
223 peer_cid: u64,
224 peer_username: String,
225 ) -> Result<(), AccountError> {
226 self.memory_backend
227 .register_p2p_as_client(session_cid, peer_cid, peer_username)
228 .await?;
229 self.save_cnac_by_cid(session_cid).await
230 }
231
232 async fn deregister_p2p_as_server(&self, cid0: u64, cid1: u64) -> Result<(), AccountError> {
236 self.memory_backend
237 .deregister_p2p_as_server(cid0, cid1)
238 .await?;
239 let (cnac0, cnac1) = {
240 let read = self.memory_backend.clients.read();
241 let cnac0 = read
242 .get(&cid0)
243 .cloned()
244 .ok_or(AccountError::ClientNonExists(cid0))?;
245 let cnac1 = read
246 .get(&cid1)
247 .cloned()
248 .ok_or(AccountError::ClientNonExists(cid1))?;
249 (cnac0, cnac1)
250 };
251
252 self.save_cnac(&cnac0).await?;
253 self.save_cnac(&cnac1).await
254 }
255
256 async fn deregister_p2p_as_client(
260 &self,
261 session_cid: u64,
262 peer_cid: u64,
263 ) -> Result<Option<MutualPeer>, AccountError> {
264 let res = self
265 .memory_backend
266 .deregister_p2p_as_client(session_cid, peer_cid)
267 .await?;
268 self.save_cnac_by_cid(session_cid).await.map(|_| res)
269 }
270
271 async fn get_hyperlan_peer_list(
275 &self,
276 session_cid: u64,
277 ) -> Result<Option<Vec<u64>>, AccountError> {
278 self.memory_backend
279 .get_hyperlan_peer_list(session_cid)
280 .await
281 }
282
283 async fn get_client_metadata(
287 &self,
288 session_cid: u64,
289 ) -> Result<Option<CNACMetadata>, AccountError> {
290 self.memory_backend.get_client_metadata(session_cid).await
291 }
292
293 async fn get_clients_metadata(
297 &self,
298 limit: Option<i32>,
299 ) -> Result<Vec<CNACMetadata>, AccountError> {
300 self.memory_backend.get_clients_metadata(limit).await
301 }
302
303 async fn get_hyperlan_peer_by_cid(
307 &self,
308 session_cid: u64,
309 peer_cid: u64,
310 ) -> Result<Option<MutualPeer>, AccountError> {
311 self.memory_backend
312 .get_hyperlan_peer_by_cid(session_cid, peer_cid)
313 .await
314 }
315
316 async fn hyperlan_peer_exists(
320 &self,
321 session_cid: u64,
322 peer_cid: u64,
323 ) -> Result<bool, AccountError> {
324 self.memory_backend
325 .hyperlan_peer_exists(session_cid, peer_cid)
326 .await
327 }
328
329 async fn hyperlan_peers_are_mutuals(
333 &self,
334 session_cid: u64,
335 peers: &[u64],
336 ) -> Result<Vec<bool>, AccountError> {
337 self.memory_backend
338 .hyperlan_peers_are_mutuals(session_cid, peers)
339 .await
340 }
341
342 async fn get_hyperlan_peers(
346 &self,
347 session_cid: u64,
348 peers: &[u64],
349 ) -> Result<Vec<MutualPeer>, AccountError> {
350 self.memory_backend
351 .get_hyperlan_peers(session_cid, peers)
352 .await
353 }
354
355 async fn get_hyperlan_peer_list_as_server(
359 &self,
360 session_cid: u64,
361 ) -> Result<Option<Vec<MutualPeer>>, AccountError> {
362 self.memory_backend
363 .get_hyperlan_peer_list_as_server(session_cid)
364 .await
365 }
366
367 async fn synchronize_hyperlan_peer_list_as_client(
371 &self,
372 cnac: &ClientNetworkAccount<R, Fcm>,
373 peers: Vec<MutualPeer>,
374 ) -> Result<(), AccountError> {
375 self.memory_backend
376 .synchronize_hyperlan_peer_list_as_client(cnac, peers)
377 .await?;
378 self.save_cnac(cnac).await
379 }
380
381 async fn get_byte_map_value(
385 &self,
386 session_cid: u64,
387 peer_cid: u64,
388 key: &str,
389 sub_key: &str,
390 ) -> Result<Option<Vec<u8>>, AccountError> {
391 self.memory_backend
392 .get_byte_map_value(session_cid, peer_cid, key, sub_key)
393 .await
394 }
395
396 async fn remove_byte_map_value(
400 &self,
401 session_cid: u64,
402 peer_cid: u64,
403 key: &str,
404 sub_key: &str,
405 ) -> Result<Option<Vec<u8>>, AccountError> {
406 let res = self
407 .memory_backend
408 .remove_byte_map_value(session_cid, peer_cid, key, sub_key)
409 .await?;
410 self.save_cnac_by_cid(session_cid).await.map(|_| res)
411 }
412
413 async fn store_byte_map_value(
417 &self,
418 session_cid: u64,
419 peer_cid: u64,
420 key: &str,
421 sub_key: &str,
422 value: Vec<u8>,
423 ) -> Result<Option<Vec<u8>>, AccountError> {
424 let res = self
425 .memory_backend
426 .store_byte_map_value(session_cid, peer_cid, key, sub_key, value)
427 .await?;
428 self.save_cnac_by_cid(session_cid).await.map(|_| res)
429 }
430
431 async fn get_byte_map_values_by_key(
435 &self,
436 session_cid: u64,
437 peer_cid: u64,
438 key: &str,
439 ) -> Result<HashMap<String, Vec<u8>>, AccountError> {
440 let res = self
441 .memory_backend
442 .get_byte_map_values_by_key(session_cid, peer_cid, key)
443 .await?;
444 self.save_cnac_by_cid(session_cid).await.map(|_| res)
445 }
446
447 async fn remove_byte_map_values_by_key(
451 &self,
452 session_cid: u64,
453 peer_cid: u64,
454 key: &str,
455 ) -> Result<HashMap<String, Vec<u8>>, AccountError> {
456 let res = self
457 .memory_backend
458 .remove_byte_map_values_by_key(session_cid, peer_cid, key)
459 .await?;
460 self.save_cnac_by_cid(session_cid).await.map(|_| res)
461 }
462
463 async fn stream_object_to_backend(
467 &self,
468 source: UnboundedReceiver<Vec<u8>>,
469 sink_metadata: &VirtualObjectMetadata,
470 status_tx: UnboundedSender<ObjectTransferStatus>,
471 ) -> Result<(), AccountError> {
472 let directory_store = self.directory_store.as_ref().unwrap();
473 let is_virtual_file = matches!(
474 sink_metadata.transfer_type,
475 TransferType::RemoteEncryptedVirtualFilesystem { .. }
476 );
477 let metadata = sink_metadata.clone();
478 let file_path = get_file_path(
479 sink_metadata.cid,
480 &sink_metadata.transfer_type,
481 directory_store,
482 Some(metadata.name.as_str()),
483 )
484 .await?;
485
486 let chunk_size = sink_metadata.plaintext_length.min(MAX_BYTES_PER_GROUP);
487
488 log::info!(target: "citadel", "Will stream object to {file_path:?}");
489 let file = tokio::fs::File::create(&file_path)
490 .await
491 .map_err(|err| AccountError::IoError(err.to_string()))?;
492
493 let _ = status_tx.send(ObjectTransferStatus::ReceptionBeginning(
494 file_path.clone(),
495 sink_metadata.clone(),
496 ));
497
498 let mut size = 0;
499 let mut writer = tokio::io::BufWriter::new(file);
500 let reader = citadel_io::tokio_util::io::StreamReader::new(
501 citadel_io::tokio_stream::wrappers::UnboundedReceiverStream::new(source).map(|r| {
502 log::trace!(target: "citadel", "Received {} byte chunk", r.len());
503 size += r.len();
504 Ok(std::io::Cursor::new(r)) as Result<std::io::Cursor<Vec<u8>>, std::io::Error>
505 }),
506 );
507
508 let mut reader = tokio::io::BufReader::with_capacity(chunk_size, reader);
509
510 if is_virtual_file {
511 let metadata_path = get_revfs_file_metadata_path(&file_path);
513 let serialized = metadata.serialize_to_vector()?;
514 tokio::fs::write(metadata_path, serialized)
515 .await
516 .map_err(|err| AccountError::IoError(err.to_string()))?
517 }
518
519 match tokio::io::copy(&mut reader, &mut writer).await {
520 Ok(bytes_written) => {
521 log::info!(target: "citadel", "Successfully wrote {bytes_written} bytes to {file_path:?}");
522 }
523
524 Err(err) => {
525 log::error!(target: "citadel", "Error while copying from reader to writer: {err}");
526 status_tx
527 .send(ObjectTransferStatus::Fail(err.to_string()))
528 .map_err(|err| AccountError::IoError(err.to_string()))?;
529 return Err(AccountError::IoError(err.to_string()));
530 }
531 }
532
533 writer
534 .flush()
535 .await
536 .map_err(|err| AccountError::IoError(err.to_string()))?;
537
538 writer
539 .into_inner()
540 .sync_all()
541 .await
542 .map_err(|err| AccountError::IoError(err.to_string()))?;
543
544 Ok(())
559 }
560
561 async fn revfs_get_file_info(
565 &self,
566 cid: u64,
567 virtual_path: std::path::PathBuf,
568 ) -> Result<(Box<dyn ObjectSource>, VirtualObjectMetadata), AccountError> {
569 let directory_store = self.directory_store.as_ref().unwrap();
570 let file_path = get_file_path(
571 cid,
572 &TransferType::RemoteEncryptedVirtualFilesystem {
573 virtual_path,
574 security_level: Default::default(),
575 },
576 directory_store,
577 None,
578 )
579 .await?;
580
581 let metadata_path = get_revfs_file_metadata_path(&file_path);
582 let raw_metadata = tokio::fs::read(&metadata_path)
585 .await
586 .map_err(|err| AccountError::IoError(err.to_string()))?;
587 let metadata: VirtualObjectMetadata =
588 VirtualObjectMetadata::deserialize_from_owned_vector(raw_metadata)?;
589
590 Ok((Box::new(file_path), metadata))
591 }
592
593 async fn revfs_delete(
597 &self,
598 cid: u64,
599 virtual_path: std::path::PathBuf,
600 ) -> Result<(), AccountError> {
601 let directory_store = self.directory_store.as_ref().unwrap();
602 let file_path = get_file_path(
603 cid,
604 &TransferType::RemoteEncryptedVirtualFilesystem {
605 virtual_path,
606 security_level: Default::default(),
607 },
608 directory_store,
609 None,
610 )
611 .await?;
612 let metadata_path = get_revfs_file_metadata_path(&file_path);
613
614 delete_paths(&[metadata_path, file_path]).await
615 }
616}
617
618impl<R: Ratchet, Fcm: Ratchet> FilesystemBackend<R, Fcm> {
619 async fn save_cnac_by_cid(&self, cid: u64) -> Result<(), AccountError> {
623 let cnac = self
624 .memory_backend
625 .clients
626 .read()
627 .get(&cid)
628 .cloned()
629 .ok_or(AccountError::ClientNonExists(cid))?;
630 self.save_cnac(&cnac).await
631 }
632
633 fn generate_cnac_local_save_path(&self, cid: u64, is_personal: bool) -> PathBuf {
637 let dirs = self.directory_store.as_ref().unwrap();
638 if is_personal {
639 PathBuf::from(format!(
640 "{}{}.{}",
641 dirs.nac_dir_personal.as_str(),
642 cid,
643 CNAC_SERIALIZED_EXTENSION
644 ))
645 } else {
646 PathBuf::from(format!(
647 "{}{}.{}",
648 dirs.nac_dir_impersonal.as_str(),
649 cid,
650 CNAC_SERIALIZED_EXTENSION
651 ))
652 }
653 }
654}
655
656impl<R: Ratchet, Fcm: Ratchet> From<String> for FilesystemBackend<R, Fcm> {
657 fn from(home_dir: String) -> Self {
661 Self {
662 home_dir,
663 memory_backend: MemoryBackend::default(),
664 directory_store: None,
665 }
666 }
667}
668
669async fn get_file_path(
673 source_cid: u64,
674 transfer_type: &TransferType,
675 directory_store: &DirectoryStore,
676 target_name: Option<&str>,
677) -> Result<PathBuf, AccountError> {
678 match transfer_type {
679 TransferType::FileTransfer => {
680 let name = target_name.ok_or_else(|| {
683 AccountError::IoError(
684 "File transfer type specified, yet, no target name given".into(),
685 )
686 })?;
687 let save_path = directory_store.file_transfer_dir.as_str();
688 let mut base_path = PathBuf::from(format!("{save_path}{source_cid}"));
689
690 tokio::fs::create_dir_all(&base_path)
692 .await
693 .map_err(|err| AccountError::IoError(err.to_string()))?;
694
695 base_path.push(name);
697
698 Ok(base_path)
699 }
700 TransferType::RemoteEncryptedVirtualFilesystem { virtual_path, .. } => {
701 let virtual_dir = &crate::misc::prepare_virtual_path(virtual_path);
702 crate::misc::validate_virtual_path(virtual_dir)?;
703 let save_path = directory_store.virtual_dir.as_str();
704 let file_path =
705 PathBuf::from(format!("{save_path}{source_cid}{}", virtual_dir.display()));
706 let mut file_path_dir = file_path.clone();
708 let _ = file_path_dir.pop();
709
710 tokio::fs::create_dir_all(&file_path_dir)
712 .await
713 .map_err(|err| AccountError::IoError(err.to_string()))?;
714 Ok(file_path)
715 }
716 }
717}
718
719fn get_revfs_file_metadata_path<P: AsRef<Path>>(path: P) -> PathBuf {
723 let mut metadata_path = format!("{}", path.as_ref().display());
724 metadata_path.push_str(crate::misc::VIRTUAL_FILE_METADATA_EXT);
725 crate::misc::prepare_virtual_path(metadata_path)
726}
727
728async fn delete_paths<T: AsRef<Path>, R: AsRef<[T]>>(paths: R) -> Result<(), AccountError> {
732 let paths = paths.as_ref();
733 for path in paths {
734 tokio::fs::remove_file(path)
735 .await
736 .map_err(|err| AccountError::IoError(err.to_string()))?;
737 }
738
739 Ok(())
740}