use crate::core::Result;
use crate::fs::config::{get_root_dir, is_binary};
use crate::fs::rate_limiter::ReconnectRateLimiter;
use braid_blob::BlobStore;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug)]
pub struct BinarySyncState {
pub url: String,
pub peer: String,
pub merge_type: String,
pub file_mtime_ns_str: Option<String>,
pub file_read_only: Option<bool>,
pub aborted: bool,
}
impl BinarySyncState {
pub fn new(url: String) -> Self {
Self {
url,
peer: uuid::Uuid::new_v4().to_string()[..12].to_string(),
merge_type: "aww".to_string(),
file_mtime_ns_str: None,
file_read_only: None,
aborted: false,
}
}
}
#[derive(Debug)]
pub struct BinarySyncManager {
syncs: Arc<RwLock<HashMap<String, BinarySyncState>>>,
rate_limiter: Arc<ReconnectRateLimiter>,
blob_store: Option<Arc<BlobStore>>,
temp_folder: PathBuf,
meta_folder: PathBuf,
}
impl BinarySyncManager {
pub fn new(
rate_limiter: Arc<ReconnectRateLimiter>,
blob_store: Arc<BlobStore>,
) -> Result<Self> {
let root = get_root_dir().map_err(|e| crate::core::BraidError::Config(e.to_string()))?;
let braidfs_dir = root.join(".braidfs");
Ok(Self {
syncs: Arc::new(RwLock::new(HashMap::new())),
rate_limiter,
blob_store: Some(blob_store),
temp_folder: braidfs_dir.join("temp"),
meta_folder: braidfs_dir.join("braid-blob-meta"),
})
}
pub async fn init_binary_sync(&self, url: &str, fullpath: &Path) -> Result<()> {
tracing::info!("init_binary_sync: {}", url);
let mut state = BinarySyncState::new(url.to_string());
let meta_path = self.get_meta_path(url);
if let Ok(content) = tokio::fs::read_to_string(&meta_path).await {
if let Ok(meta) = serde_json::from_str::<serde_json::Value>(&content) {
if let Some(peer) = meta.get("peer").and_then(|v| v.as_str()) {
state.peer = peer.to_string();
}
if let Some(mtime) = meta.get("file_mtime_ns_str").and_then(|v| v.as_str()) {
state.file_mtime_ns_str = Some(mtime.to_string());
}
}
}
self.save_meta(url, &state).await?;
self.signal_file_needs_reading(url, fullpath).await?;
self.syncs.write().await.insert(url.to_string(), state);
Ok(())
}
pub async fn signal_file_needs_reading(&self, url: &str, fullpath: &Path) -> Result<()> {
let metadata = match tokio::fs::metadata(fullpath).await {
Ok(m) => m,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(crate::core::BraidError::Io(e)),
};
let mtime = metadata
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let mtime_str = mtime.to_string();
let needs_upload = {
let syncs = self.syncs.read().await;
if let Some(state) = syncs.get(url) {
state.file_mtime_ns_str.as_ref() != Some(&mtime_str)
} else {
true
}
};
if needs_upload {
let data = tokio::fs::read(fullpath).await?;
if let Some(store) = &self.blob_store {
store
.put(url, data.into(), vec![], vec![], None)
.await
.map_err(crate::core::BraidError::Client)?;
}
let mut syncs = self.syncs.write().await;
if let Some(state) = syncs.get_mut(url) {
state.file_mtime_ns_str = Some(mtime_str);
drop(syncs); self.save_meta(url, &self.syncs.read().await.get(url).unwrap())
.await?;
}
}
Ok(())
}
async fn save_meta(&self, url: &str, state: &BinarySyncState) -> Result<()> {
tokio::fs::create_dir_all(&self.meta_folder).await?;
let meta = serde_json::json!({
"merge_type": state.merge_type,
"peer": state.peer,
"file_mtime_ns_str": state.file_mtime_ns_str,
});
let meta_path = self.get_meta_path(url);
braid_blob::store::atomic_write(
&meta_path,
serde_json::to_string_pretty(&meta)?.as_bytes(),
&self.temp_folder,
)
.await
.map_err(crate::core::BraidError::Client)?;
Ok(())
}
fn get_meta_path(&self, url: &str) -> PathBuf {
let encoded = braid_blob::store::encode_filename(url);
self.meta_folder.join(encoded)
}
pub async fn disconnect(&self, url: &str) {
let mut syncs = self.syncs.write().await;
if let Some(state) = syncs.get_mut(url) {
state.aborted = true;
}
self.rate_limiter.on_diss(url).await;
}
pub async fn reconnect(&self, url: &str, fullpath: &Path) -> Result<()> {
self.rate_limiter.get_turn(url).await;
self.rate_limiter.on_conn(url).await;
self.signal_file_needs_reading(url, fullpath).await
}
pub fn blob_store(&self) -> Arc<BlobStore> {
self.blob_store
.clone()
.expect("BlobStore must be initialized")
}
}
pub struct BinarySyncDb {
fullpath: PathBuf,
temp_folder: PathBuf,
}
impl BinarySyncDb {
pub fn new(fullpath: PathBuf, temp_folder: PathBuf) -> Self {
Self {
fullpath,
temp_folder,
}
}
pub async fn read(&self, _key: &str) -> Result<Option<Vec<u8>>> {
match tokio::fs::read(&self.fullpath).await {
Ok(data) => Ok(Some(data)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(crate::core::BraidError::Io(e)),
}
}
pub async fn write(&self, _key: &str, data: &[u8]) -> Result<std::fs::Metadata> {
braid_blob::store::atomic_write(&self.fullpath, data, &self.temp_folder)
.await
.map_err(crate::core::BraidError::Client)?;
tokio::fs::metadata(&self.fullpath)
.await
.map_err(crate::core::BraidError::Io)
}
pub async fn delete(&self, _key: &str) -> Result<()> {
match tokio::fs::remove_file(&self.fullpath).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(crate::core::BraidError::Io(e)),
}
}
}
pub fn should_use_binary_sync(path: &str) -> bool {
is_binary(path)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_use_binary_sync() {
assert!(should_use_binary_sync("image.jpg"));
assert!(should_use_binary_sync("document.pdf"));
assert!(!should_use_binary_sync("readme.txt"));
assert!(!should_use_binary_sync("code.rs"));
}
}