#![allow(dead_code)]
use anyhow::{anyhow, Context, Result};
use aws_config::{BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_sdk_s3::config::SharedCredentialsProvider;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::Client;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::AsyncWriteExt;
use walkdir::WalkDir;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncConfig {
pub enabled: bool,
pub provider: SyncProvider,
pub bucket: String,
pub endpoint: Option<String>,
pub region: String,
pub access_key_id: Option<String>,
pub secret_access_key: Option<String>,
pub prefix: Option<String>,
pub force_path_style: bool,
pub auto_sync: bool,
pub sync_interval_minutes: u32,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum SyncProvider {
None,
S3,
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
enabled: false,
provider: SyncProvider::None,
bucket: String::new(),
endpoint: None,
region: "us-east-1".to_string(),
access_key_id: None,
secret_access_key: None,
prefix: None,
force_path_style: false,
auto_sync: false,
sync_interval_minutes: 60,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncStatus {
pub last_sync: Option<String>,
pub total_files: usize,
pub synced_files: usize,
pub pending_files: usize,
pub errors: Vec<String>,
}
pub struct CloudSync {
config: SyncConfig,
local_dir: PathBuf,
}
impl CloudSync {
pub fn new(local_dir: PathBuf, config: SyncConfig) -> Self {
Self { local_dir, config }
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
&& self.config.provider == SyncProvider::S3
&& !self.config.bucket.is_empty()
}
pub fn get_config(&self) -> &SyncConfig {
&self.config
}
pub fn update_config(&mut self, config: SyncConfig) {
self.config = config;
}
async fn build_client(&self) -> Result<Client> {
build_s3_client(&self.config).await
}
fn key_for(&self, rel: &Path) -> String {
let rel_str = rel
.components()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.collect::<Vec<_>>()
.join("/");
match &self.config.prefix {
Some(p) if !p.is_empty() => format!("{}/{}", p.trim_end_matches('/'), rel_str),
_ => rel_str,
}
}
pub async fn sync(&self) -> Result<SyncStatus> {
if !self.is_enabled() {
return Ok(SyncStatus {
last_sync: None,
total_files: 0,
synced_files: 0,
pending_files: 0,
errors: vec!["Sync is not enabled".to_string()],
});
}
let client = self.build_client().await?;
let files = self.collect_files()?;
let total = files.len();
let mut synced = 0usize;
let mut pending = 0usize;
let mut errors: Vec<String> = Vec::new();
for (abs, rel) in &files {
let key = self.key_for(rel);
match self.upload_one(&client, abs, &key).await {
Ok(()) => synced += 1,
Err(e) => {
pending += 1;
errors.push(format!("{}: {}", key, e));
}
}
}
Ok(SyncStatus {
last_sync: Some(chrono::Utc::now().to_rfc3339()),
total_files: total,
synced_files: synced,
pending_files: pending,
errors,
})
}
async fn upload_one(&self, client: &Client, abs: &Path, key: &str) -> Result<()> {
let body = ByteStream::from_path(abs)
.await
.with_context(|| format!("reading {}", abs.display()))?;
client
.put_object()
.bucket(&self.config.bucket)
.key(key)
.body(body)
.send()
.await
.with_context(|| format!("PutObject {}", key))?;
Ok(())
}
fn collect_files(&self) -> Result<Vec<(PathBuf, PathBuf)>> {
let mut out = Vec::new();
for entry in WalkDir::new(&self.local_dir)
.follow_links(false)
.into_iter()
.filter_map(|e| e.ok())
{
if !entry.file_type().is_file() {
continue;
}
let abs = entry.path().to_path_buf();
let rel = abs
.strip_prefix(&self.local_dir)
.map_err(|e| anyhow!("strip_prefix failed for {}: {}", abs.display(), e))?
.to_path_buf();
out.push((abs, rel));
}
Ok(out)
}
pub async fn download(&self) -> Result<SyncStatus> {
if !self.is_enabled() {
return Ok(SyncStatus {
last_sync: None,
total_files: 0,
synced_files: 0,
pending_files: 0,
errors: vec!["Sync is not enabled".to_string()],
});
}
let client = self.build_client().await?;
let prefix_filter = self.config.prefix.clone().unwrap_or_default();
let mut continuation: Option<String> = None;
let mut keys: Vec<String> = Vec::new();
loop {
let mut req = client
.list_objects_v2()
.bucket(&self.config.bucket);
if !prefix_filter.is_empty() {
req = req.prefix(&prefix_filter);
}
if let Some(token) = &continuation {
req = req.continuation_token(token);
}
let resp = req.send().await.context("ListObjectsV2")?;
for obj in resp.contents() {
if let Some(k) = obj.key() {
keys.push(k.to_string());
}
}
if resp.is_truncated().unwrap_or(false) {
continuation = resp.next_continuation_token().map(|s| s.to_string());
if continuation.is_none() {
break;
}
} else {
break;
}
}
let total = keys.len();
let mut synced = 0usize;
let mut pending = 0usize;
let mut errors: Vec<String> = Vec::new();
for key in &keys {
match self.download_one(&client, key, &prefix_filter).await {
Ok(()) => synced += 1,
Err(e) => {
pending += 1;
errors.push(format!("{}: {}", key, e));
}
}
}
Ok(SyncStatus {
last_sync: Some(chrono::Utc::now().to_rfc3339()),
total_files: total,
synced_files: synced,
pending_files: pending,
errors,
})
}
async fn download_one(&self, client: &Client, key: &str, prefix: &str) -> Result<()> {
let rel = if !prefix.is_empty() {
key.strip_prefix(prefix)
.map(|s| s.trim_start_matches('/'))
.unwrap_or(key)
} else {
key
};
if rel.is_empty() {
return Ok(());
}
let dest = self.local_dir.join(rel);
if let Some(parent) = dest.parent() {
fs::create_dir_all(parent)
.await
.with_context(|| format!("create_dir_all {}", parent.display()))?;
}
let resp = client
.get_object()
.bucket(&self.config.bucket)
.key(key)
.send()
.await
.with_context(|| format!("GetObject {}", key))?;
let mut stream = resp.body.into_async_read();
let mut file = fs::File::create(&dest)
.await
.with_context(|| format!("create {}", dest.display()))?;
tokio::io::copy(&mut stream, &mut file)
.await
.with_context(|| format!("write {}", dest.display()))?;
file.flush().await.ok();
Ok(())
}
pub async fn restore(&self, backup_id: &str) -> Result<()> {
if !self.is_enabled() {
return Err(anyhow!("Sync is not enabled"));
}
let mut tmp = self.config.clone();
tmp.prefix = Some(match &self.config.prefix {
Some(p) if !p.is_empty() => format!("{}/{}", p.trim_end_matches('/'), backup_id),
_ => backup_id.to_string(),
});
let nested = CloudSync {
config: tmp,
local_dir: self.local_dir.clone(),
};
let status = nested.download().await?;
if !status.errors.is_empty() {
return Err(anyhow!(
"restore completed with {} errors; first: {}",
status.errors.len(),
status.errors[0]
));
}
Ok(())
}
}
pub async fn build_s3_client(cfg: &SyncConfig) -> Result<Client> {
let region = if cfg.region.is_empty() {
"us-east-1".to_string()
} else {
cfg.region.clone()
};
let mut loader = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(region));
if let (Some(id), Some(secret)) = (&cfg.access_key_id, &cfg.secret_access_key) {
let creds = Credentials::new(id, secret, None, None, "i-self-static");
loader = loader.credentials_provider(SharedCredentialsProvider::new(creds));
}
let shared = loader.load().await;
let mut s3_builder =
aws_sdk_s3::config::Builder::from(&shared).force_path_style(cfg.force_path_style);
if let Some(endpoint) = &cfg.endpoint {
s3_builder = s3_builder.endpoint_url(endpoint);
}
Ok(Client::from_conf(s3_builder.build()))
}
pub async fn load_sync_config(dir: &Path) -> Result<SyncConfig> {
let config_path = dir.join("sync_config.json");
if !config_path.exists() {
return Ok(SyncConfig::default());
}
let content = fs::read_to_string(&config_path).await?;
match serde_json::from_str::<SyncConfig>(&content) {
Ok(config) => Ok(config),
Err(parse_err) => {
tracing::warn!(
"Could not parse {} ({}). This is likely a pre-0.4 config — \
the schema changed when sync became real. Falling back to \
defaults; reconfigure with [cloud] in config.toml or the \
ISELF_SYNC_* env vars (see README → Cloud sync).",
config_path.display(),
parse_err
);
Ok(SyncConfig::default())
}
}
}
pub async fn save_sync_config(dir: &Path, config: &SyncConfig) -> Result<()> {
if !dir.exists() {
fs::create_dir_all(dir).await?;
}
let config_path = dir.join("sync_config.json");
let content = serde_json::to_string_pretty(config)?;
fs::write(config_path, content).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn cfg() -> SyncConfig {
SyncConfig {
enabled: true,
provider: SyncProvider::S3,
bucket: "mybucket".to_string(),
..SyncConfig::default()
}
}
#[test]
fn key_for_no_prefix_uses_forward_slashes() {
let s = CloudSync::new(PathBuf::from("/local"), cfg());
let k = s.key_for(Path::new("a/b/c.txt"));
assert_eq!(k, "a/b/c.txt");
}
#[test]
fn key_for_with_prefix_joins_with_single_slash() {
let mut c = cfg();
c.prefix = Some("user42".to_string());
let s = CloudSync::new(PathBuf::from("/local"), c);
assert_eq!(s.key_for(Path::new("snippets/x.json")), "user42/snippets/x.json");
}
#[test]
fn key_for_strips_trailing_slash_in_prefix() {
let mut c = cfg();
c.prefix = Some("user42/".to_string());
let s = CloudSync::new(PathBuf::from("/local"), c);
assert_eq!(s.key_for(Path::new("a.txt")), "user42/a.txt");
}
#[test]
fn is_enabled_requires_bucket() {
let mut c = cfg();
c.bucket = String::new();
let s = CloudSync::new(PathBuf::from("/local"), c);
assert!(!s.is_enabled());
}
#[test]
fn is_enabled_requires_provider() {
let mut c = cfg();
c.provider = SyncProvider::None;
let s = CloudSync::new(PathBuf::from("/local"), c);
assert!(!s.is_enabled());
}
#[tokio::test]
async fn legacy_sync_config_falls_back_to_default_with_warning() {
let dir = tempfile::tempdir().unwrap();
let legacy = r#"{
"enabled": true,
"provider": "Custom",
"endpoint": "https://example.com",
"api_key": "leftover-from-0.3",
"auto_sync": false,
"sync_interval_minutes": 60
}"#;
tokio::fs::write(dir.path().join("sync_config.json"), legacy)
.await
.unwrap();
let cfg = load_sync_config(dir.path()).await.unwrap();
assert!(!cfg.enabled);
assert_eq!(cfg.provider, SyncProvider::None);
assert!(cfg.bucket.is_empty());
}
#[tokio::test]
async fn save_and_load_round_trip() {
let dir = tempfile::tempdir().unwrap();
let mut c = cfg();
c.endpoint = Some("http://localhost:9000".to_string());
c.access_key_id = Some("minio".to_string());
c.secret_access_key = Some("minio-secret".to_string());
save_sync_config(dir.path(), &c).await.unwrap();
let loaded = load_sync_config(dir.path()).await.unwrap();
assert_eq!(loaded.bucket, c.bucket);
assert_eq!(loaded.endpoint, c.endpoint);
assert_eq!(loaded.access_key_id, c.access_key_id);
}
#[tokio::test]
#[ignore]
async fn sync_round_trip_against_minio() {
let endpoint = match std::env::var("ISELF_TEST_MINIO_ENDPOINT") {
Ok(v) if !v.is_empty() => v,
_ => {
eprintln!(
"skipping: ISELF_TEST_MINIO_ENDPOINT unset. \
See test docstring for setup instructions."
);
return;
}
};
let bucket = std::env::var("ISELF_TEST_MINIO_BUCKET")
.unwrap_or_else(|_| "iself-test".to_string());
let access_key = std::env::var("ISELF_TEST_MINIO_ACCESS_KEY")
.unwrap_or_else(|_| "minio".to_string());
let secret_key = std::env::var("ISELF_TEST_MINIO_SECRET_KEY")
.unwrap_or_else(|_| "minio-secret".to_string());
let prefix = format!("test-runs/{}", uuid::Uuid::new_v4());
let upload_dir = tempfile::tempdir().unwrap();
tokio::fs::write(upload_dir.path().join("alpha.txt"), b"alpha-content")
.await
.unwrap();
tokio::fs::create_dir_all(upload_dir.path().join("nested"))
.await
.unwrap();
tokio::fs::write(
upload_dir.path().join("nested").join("beta.txt"),
b"beta-content",
)
.await
.unwrap();
let push_config = SyncConfig {
enabled: true,
provider: SyncProvider::S3,
bucket: bucket.clone(),
endpoint: Some(endpoint.clone()),
region: "us-east-1".to_string(),
access_key_id: Some(access_key.clone()),
secret_access_key: Some(secret_key.clone()),
prefix: Some(prefix.clone()),
force_path_style: true, auto_sync: false,
sync_interval_minutes: 60,
};
let pusher = CloudSync::new(upload_dir.path().to_path_buf(), push_config.clone());
let push_status = pusher.sync().await.expect("push failed");
assert_eq!(
push_status.synced_files, 2,
"expected 2 files uploaded, got {:?}",
push_status
);
assert_eq!(push_status.pending_files, 0);
assert!(push_status.errors.is_empty(), "errors: {:?}", push_status.errors);
let download_dir = tempfile::tempdir().unwrap();
let puller = CloudSync::new(download_dir.path().to_path_buf(), push_config);
let pull_status = puller.download().await.expect("pull failed");
assert_eq!(
pull_status.synced_files, 2,
"expected 2 objects downloaded, got {:?}",
pull_status
);
let alpha = tokio::fs::read(download_dir.path().join("alpha.txt"))
.await
.expect("alpha missing after pull");
assert_eq!(alpha, b"alpha-content");
let beta = tokio::fs::read(download_dir.path().join("nested").join("beta.txt"))
.await
.expect("beta missing after pull");
assert_eq!(beta, b"beta-content");
}
}