use crate::query::Query;
use crate::resources::v1::containers::models::V1ContainerStatus;
use crate::resources::v1::volumes::models::V1VolumeDriver;
use sea_orm::{DatabaseConnection, DbErr};
use serde::{Deserialize, Serialize};
use serde_json::from_str;
use std::collections::HashMap;
use std::error::Error;
use std::fs;
use std::path::Path;
use std::process::Command;
use std::process::Stdio;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Command as TokioCommand;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct VolumeConfig {
pub paths: Vec<VolumePath>,
#[serde(default = "default_cache_dir")]
pub cache_dir: String,
#[serde(default)]
pub symlinks: Vec<SymlinkConfig>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SymlinkConfig {
pub source: String,
pub symlink_path: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct VolumePath {
pub source: String,
pub dest: String,
#[serde(default)]
pub resync: bool,
#[serde(default = "default_continuous")]
pub continuous: bool,
#[serde(default = "default_volume_driver")]
pub driver: V1VolumeDriver,
}
fn default_volume_driver() -> V1VolumeDriver {
V1VolumeDriver::RCLONE_SYNC
}
fn default_continuous() -> bool {
true
}
fn default_cache_dir() -> String {
format!("/cache/rclone")
}
impl VolumeConfig {
pub fn read_from_file(path: &str) -> Result<Self, Box<dyn Error>> {
if !Path::new(path).exists() {
return Err(format!("Config file not found: {}", path).into());
}
let yaml_content = fs::read_to_string(path)?;
let config: VolumeConfig = serde_yaml::from_str(&yaml_content)
.map_err(|e| format!("Failed to parse YAML: {}", e))?;
Ok(config)
}
pub fn write_to_file(&self, path: &str) -> Result<(), Box<dyn Error>> {
let yaml_content = serde_yaml::to_string(self)
.map_err(|e| format!("Failed to serialize to YAML: {}", e))?;
if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent)?;
}
fs::write(path, yaml_content)?;
Ok(())
}
pub fn new() -> Self {
VolumeConfig {
paths: Vec::new(),
cache_dir: default_cache_dir(),
symlinks: Vec::new(),
}
}
pub fn add_path(
&mut self,
source: String,
dest: String,
resync: bool,
driver: V1VolumeDriver,
continuous: bool,
) {
self.paths.push(VolumePath {
source,
dest,
resync,
continuous,
driver,
});
}
pub fn remove_path(&mut self, index: usize) -> Result<(), Box<dyn Error>> {
if index >= self.paths.len() {
return Err(format!(
"Index {} out of bounds (max: {})",
index,
self.paths.len() - 1
)
.into());
}
self.paths.remove(index);
Ok(())
}
pub fn list_paths(&self) -> Vec<(&String, &String, bool, V1VolumeDriver, bool)> {
self.paths
.iter()
.map(|path| {
(
&path.source,
&path.dest,
path.resync,
path.driver.clone(),
path.continuous,
)
})
.collect()
}
pub fn add_symlink_config(
&mut self,
source: String,
symlink_path: String,
) -> Result<(), Box<dyn Error>> {
let source_exists = self.paths.iter().any(|path| path.source == source);
if !source_exists {
return Err(format!(
"Source path '{}' does not exist in the configuration",
source
)
.into());
}
for symlink_config in &self.symlinks {
if symlink_config.source == source && symlink_config.symlink_path == symlink_path {
return Err(format!(
"Symlink from '{}' to '{}' already exists",
source, symlink_path
)
.into());
}
}
self.symlinks.push(SymlinkConfig {
source,
symlink_path,
});
Ok(())
}
pub fn remove_symlink(
&mut self,
source: &str,
symlink_path: &str,
) -> Result<(), Box<dyn Error>> {
let position = self
.symlinks
.iter()
.position(|config| config.source == source && config.symlink_path == symlink_path);
if let Some(index) = position {
self.symlinks.remove(index);
return Ok(());
}
Err(format!("Symlink from '{}' to '{}' not found", source, symlink_path).into())
}
pub fn list_all_symlinks(&self) -> Vec<(&str, &str)> {
self.symlinks
.iter()
.map(|config| (config.source.as_str(), config.symlink_path.as_str()))
.collect()
}
pub fn get_symlinks_for_source(&self, source: &str) -> Vec<&str> {
self.symlinks
.iter()
.filter(|config| config.source == source)
.map(|config| config.symlink_path.as_str())
.collect()
}
}
pub async fn execute_continuous_sync(
config_path: String,
interval_seconds: u64,
create_if_missing: bool,
) -> Result<(), Box<dyn Error>> {
println!(
"Starting continuous sync from configuration: {}",
config_path
);
println!("Sync interval: {} seconds", interval_seconds);
let mut running_processes: HashMap<(String, String), tokio::process::Child> = HashMap::new();
loop {
let current_config = match VolumeConfig::read_from_file(&config_path) {
Ok(config) => config,
Err(e) => {
if !Path::new(&config_path).exists() && create_if_missing {
match create_empty_config(&config_path) {
Ok(_) => VolumeConfig::new(),
Err(e) => {
println!(
"Failed to create config file: {}. Will retry on next interval.",
e
);
tokio::time::sleep(tokio::time::Duration::from_secs(interval_seconds))
.await;
continue;
}
}
} else {
println!("Error reading config: {}. Will retry on next interval.", e);
tokio::time::sleep(tokio::time::Duration::from_secs(interval_seconds)).await;
continue;
}
}
};
{
let mut finished = vec![];
for (path_key, child) in &mut running_processes {
match child.try_wait() {
Ok(Some(status)) => {
println!(
"Rclone subprocess for {} ⟷ {} exited with code: {:?}",
path_key.0,
path_key.1,
status.code()
);
finished.push(path_key.clone());
}
Ok(None) => {
}
Err(e) => {
println!(
"Error checking status of {} ⟷ {}: {}",
path_key.0, path_key.1, e
);
}
}
}
for f in finished {
running_processes.remove(&f);
}
}
let mut paths_to_remove = Vec::new();
for (path_key, process) in &mut running_processes {
let still_exists = current_config
.paths
.iter()
.any(|path| (&path.source, &path.dest) == (&path_key.0, &path_key.1));
if !still_exists {
println!(
"Path removed from config: {} ⟷ {}, stopping sync process",
path_key.0, path_key.1
);
let _ = process.kill();
paths_to_remove.push(path_key.clone());
}
}
for path_key in paths_to_remove {
running_processes.remove(&path_key);
}
for path in ¤t_config.paths {
if !path.continuous {
continue;
}
let path_key = (path.source.clone(), path.dest.clone());
let needs_new_process = path.resync || !running_processes.contains_key(&path_key);
if needs_new_process {
if running_processes.contains_key(&path_key) {
if let Some(mut process) = running_processes.remove(&path_key) {
println!(
"Stopping existing sync process for {} ⟷ {}",
path_key.0, path_key.1
);
let _ = process.kill();
}
}
match start_sync_process(path, ¤t_config.cache_dir).await {
Ok(process) => {
println!("Started sync process for {} ⟷ {}", path.source, path.dest);
running_processes.insert(path_key, process);
if path.resync {
let mut updated_config = current_config.clone();
for p in &mut updated_config.paths {
if p.source == path.source && p.dest == path.dest {
p.resync = false;
}
}
if let Err(e) = updated_config.write_to_file(&config_path) {
println!("Failed to update config after resync: {}", e);
}
}
}
Err(e) => {
println!(
"Failed to start sync process for {} ⟷ {}: {}",
path.source, path.dest, e
);
}
}
}
}
println!(
"Currently managing {} sync processes. Waiting for next check...",
running_processes.len()
);
tokio::time::sleep(tokio::time::Duration::from_secs(interval_seconds)).await;
}
}
async fn start_sync_process(
path: &VolumePath,
_cache_dir: &str,
) -> Result<tokio::process::Child, Box<dyn Error>> {
let mut cmd = TokioCommand::new("rclone");
let source = normalize_s3_path(&path.source);
let dest = normalize_s3_path(&path.dest);
ensure_path_exists(&source).await?;
ensure_path_exists(&dest).await?;
if path.driver == V1VolumeDriver::RCLONE_BISYNC {
cmd.arg("bisync");
cmd.arg(&source);
cmd.arg(&dest);
cmd.arg("--resync");
} else if path.driver == V1VolumeDriver::RCLONE_COPY {
cmd.arg("copy");
cmd.arg(&source);
cmd.arg(&dest);
} else {
cmd.arg("sync");
cmd.arg(&source);
cmd.arg(&dest);
}
if path.resync && path.driver == V1VolumeDriver::RCLONE_BISYNC {
cmd.arg("--resync");
}
cmd.arg("--exclude").arg("__pycache__/**");
if path.driver == V1VolumeDriver::RCLONE_BISYNC {
cmd.arg("--force");
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd.spawn()?;
if let Some(stdout) = child.stdout.take() {
let source_clone = path.source.clone();
let dest_clone = path.dest.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
println!(
"[rclone stdout: {} ⟷ {}] {}",
source_clone, dest_clone, line
);
}
});
}
if let Some(stderr) = child.stderr.take() {
let source_clone = path.source.clone();
let dest_clone = path.dest.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
println!(
"[rclone stderr: {} ⟷ {}] {}",
source_clone, dest_clone, line
);
}
});
}
Ok(child)
}
fn normalize_s3_path(path: &str) -> String {
if path.starts_with("s3://") {
format!("s3:{}", &path[5..])
} else {
path.to_string()
}
}
pub async fn execute_sync(
config_path: String,
create_if_missing: bool,
) -> Result<(), Box<dyn Error>> {
println!("Reading sync configuration from: {}", config_path);
if !Path::new(&config_path).exists() {
if create_if_missing {
create_empty_config(&config_path)?;
} else {
return Err(format!("Config file not found: {}", config_path).into());
}
}
let config = VolumeConfig::read_from_file(&config_path)?;
if config.paths.is_empty() {
println!("No paths to sync found in the configuration file.");
return Ok(());
}
println!("Found {} paths to sync", config.paths.len());
for (index, path) in config.paths.iter().enumerate() {
let sync_type = if path.driver == V1VolumeDriver::RCLONE_BISYNC {
"Bidirectional"
} else {
"Unidirectional"
};
println!(
"[{}/{}] {} syncing between {} and {}",
index + 1,
config.paths.len(),
sync_type,
path.source,
path.dest
);
let source = normalize_s3_path(&path.source);
let dest = normalize_s3_path(&path.dest);
ensure_path_exists(&source).await?;
ensure_path_exists(&dest).await?;
let mut cmd = Command::new("rclone");
let source = normalize_s3_path(&path.source);
let dest = normalize_s3_path(&path.dest);
if path.driver == V1VolumeDriver::RCLONE_BISYNC {
cmd.arg("bisync");
cmd.arg(&source);
cmd.arg(&dest);
if path.resync {
cmd.arg("--resync");
}
cmd.arg("--force");
} else if path.driver == V1VolumeDriver::RCLONE_COPY {
cmd.arg("copy");
cmd.arg(&source);
cmd.arg(&dest);
} else {
cmd.arg("sync");
cmd.arg(&source);
cmd.arg(&dest);
}
cmd.arg("--exclude").arg("__pycache__/**");
let output = cmd.output()?;
if output.status.success() {
println!(
"Successfully synced between {} and {}",
path.source, path.dest
);
if path.resync && path.driver == V1VolumeDriver::RCLONE_BISYNC {
let mut updated_config = VolumeConfig::read_from_file(&config_path)?;
if index < updated_config.paths.len() {
updated_config.paths[index].resync = false;
updated_config.write_to_file(&config_path)?;
}
}
} else {
let error = String::from_utf8_lossy(&output.stderr);
println!("Failed to sync: {}", error);
if error.contains("empty prior Path1 listing")
|| error.contains("Must run --resync to recover")
{
println!("Detected need for resync. Attempting to resync...");
let mut resync_cmd = Command::new("rclone");
resync_cmd.arg("bisync");
resync_cmd.arg(&source);
resync_cmd.arg(&dest);
resync_cmd.arg("--resync");
resync_cmd.arg("--force");
let resync_output = resync_cmd.output()?;
if resync_output.status.success() {
println!("Resync successful between {} and {}", source, dest);
} else {
let resync_error = String::from_utf8_lossy(&resync_output.stderr);
println!("Resync failed: {}", resync_error);
}
}
}
}
println!("Sync operation completed");
Ok(())
}
pub fn create_empty_config(path: &str) -> Result<(), Box<dyn Error>> {
let config = VolumeConfig::new();
config.write_to_file(path)?;
println!("Created empty sync configuration at: {}", path);
Ok(())
}
pub fn create_example_config(path: &str) -> Result<(), Box<dyn Error>> {
let mut config = VolumeConfig::new();
config.add_path(
"/path/to/local/directory1".to_string(),
"s3:your-bucket/directory1".to_string(),
true, V1VolumeDriver::RCLONE_BISYNC, true, );
config.add_path(
"/path/to/local/directory2".to_string(),
"s3:your-bucket/directory2".to_string(),
false, V1VolumeDriver::RCLONE_SYNC, false, );
config.write_to_file(path)?;
println!("Created example sync configuration at: {}", path);
Ok(())
}
pub fn add_sync_path(
config_path: &str,
source: String,
dest: String,
volume_type: V1VolumeDriver,
continuous: bool,
) -> Result<(), Box<dyn Error>> {
let mut config = if Path::new(config_path).exists() {
VolumeConfig::read_from_file(config_path)?
} else {
VolumeConfig::new()
};
let resync = volume_type.clone() == V1VolumeDriver::RCLONE_BISYNC; config.add_path(
source.clone(),
dest.clone(),
resync,
volume_type.clone(),
continuous,
);
config.write_to_file(config_path)?;
let sync_type = if volume_type.clone() == V1VolumeDriver::RCLONE_BISYNC {
"bidirectional"
} else {
"unidirectional"
};
let sync_mode = if continuous { "continuous" } else { "once" };
println!(
"Added {} {} sync path between {} and {}",
sync_type, sync_mode, source, dest
);
Ok(())
}
pub fn remove_sync_path(config_path: &str, index: usize) -> Result<(), Box<dyn Error>> {
let mut config = VolumeConfig::read_from_file(config_path)?;
let path_to_remove = if index < config.paths.len() {
Some((
config.paths[index].source.clone(),
config.paths[index].dest.clone(),
))
} else {
None
};
config.remove_path(index)?;
config.write_to_file(config_path)?;
if let Some((source, destination)) = path_to_remove {
println!("Removed sync path between {} and {}", source, destination);
}
Ok(())
}
pub fn list_sync_paths(config_path: &str) -> Result<(), Box<dyn Error>> {
let config = VolumeConfig::read_from_file(config_path)?;
if config.paths.is_empty() {
println!("No sync paths found in the configuration.");
return Ok(());
}
println!("Sync paths in {}:", config_path);
for (i, (source, destination, resync, volume_type, continuous)) in
config.list_paths().iter().enumerate()
{
let direction = if *volume_type == V1VolumeDriver::RCLONE_BISYNC {
"bidirectional"
} else {
"unidirectional"
};
let mode = if *continuous { "continuous" } else { "once" };
let resync_status = if *resync && *volume_type == V1VolumeDriver::RCLONE_BISYNC {
" (resync pending)"
} else {
""
};
println!(
"[{}] {} ⟷ {} ({}, {}){}",
i, source, destination, direction, mode, resync_status
);
}
Ok(())
}
pub fn add_symlink_to_path(
config_path: &str,
path_index: usize,
symlink_path: &str,
) -> Result<(), Box<dyn Error>> {
let mut config = VolumeConfig::read_from_file(config_path)?;
let source = config.paths[path_index].source.clone();
config.add_symlink_config(source, symlink_path.to_string())?;
config.write_to_file(config_path)?;
println!(
"Added symlink {} to path index {}",
symlink_path, path_index
);
Ok(())
}
pub fn remove_symlink_from_path(
config_path: &str,
path_index: usize,
symlink_index: usize,
) -> Result<(), Box<dyn Error>> {
let mut config = VolumeConfig::read_from_file(config_path)?;
if path_index >= config.paths.len() {
return Err(format!("Path index {} out of bounds", path_index).into());
}
let source = config.paths[path_index].source.clone();
let matching_symlinks: Vec<_> = config
.symlinks
.iter()
.filter(|s| s.source == source)
.collect();
if symlink_index >= matching_symlinks.len() {
return Err(format!("Symlink index {} out of bounds", symlink_index).into());
}
let symlink_path = matching_symlinks[symlink_index].symlink_path.clone();
config.remove_symlink(&source, &symlink_path)?;
config.write_to_file(config_path)?;
println!(
"Removed symlink {} from path index {}",
symlink_path, path_index
);
Ok(())
}
pub fn list_symlinks_for_path(config_path: &str, path_index: usize) -> Result<(), Box<dyn Error>> {
let config = VolumeConfig::read_from_file(config_path)?;
if path_index >= config.paths.len() {
return Err(format!("Path index {} out of bounds", path_index).into());
}
let source = &config.paths[path_index].source;
let symlinks = config.get_symlinks_for_source(source);
if symlinks.is_empty() {
println!("No symlinks found for path index {}", path_index);
return Ok(());
}
println!("Symlinks for path index {}:", path_index);
for (i, symlink) in symlinks.iter().enumerate() {
println!("[{}] {}", i, symlink);
}
Ok(())
}
pub fn create_symlinks_from_config(config_path: &str) -> Result<(), Box<dyn Error>> {
let config = VolumeConfig::read_from_file(config_path)?;
let mut created_count = 0;
let mut error_count = 0;
for symlink_config in &config.symlinks {
if symlink_config.source.contains(":") {
println!(
"Skipping symlink for remote path: {}",
symlink_config.source
);
continue;
}
if let Some(parent) = Path::new(&symlink_config.symlink_path).parent() {
if let Err(e) = fs::create_dir_all(parent) {
println!(
"Failed to create parent directories for symlink {}: {}",
symlink_config.symlink_path, e
);
error_count += 1;
continue;
}
}
if Path::new(&symlink_config.symlink_path).exists() {
if Path::new(&symlink_config.symlink_path).is_symlink() {
if let Err(e) = fs::remove_file(&symlink_config.symlink_path) {
println!(
"Failed to remove existing symlink {}: {}",
symlink_config.symlink_path, e
);
error_count += 1;
continue;
}
} else {
println!(
"Destination path exists and is not a symlink: {}",
symlink_config.symlink_path
);
error_count += 1;
continue;
}
}
let result = {
#[cfg(unix)]
{
std::os::unix::fs::symlink(&symlink_config.source, &symlink_config.symlink_path)
}
#[cfg(windows)]
{
match fs::metadata(&symlink_config.source) {
Ok(metadata) => {
if metadata.is_dir() {
std::os::windows::fs::symlink_dir(
&symlink_config.source,
&symlink_config.symlink_path,
)
} else {
std::os::windows::fs::symlink_file(
&symlink_config.source,
&symlink_config.symlink_path,
)
}
}
Err(e) => Err(e),
}
}
};
match result {
Ok(_) => {
println!(
"Created symlink from {} to {}",
symlink_config.source, symlink_config.symlink_path
);
created_count += 1;
}
Err(e) => {
println!(
"Failed to create symlink from {} to {}: {}",
symlink_config.source, symlink_config.symlink_path, e
);
error_count += 1;
}
}
}
println!(
"Symlink creation completed: {} created, {} failed",
created_count, error_count
);
Ok(())
}
pub fn add_symlink(
config_path: &str,
source: &str,
symlink_path: &str,
) -> Result<(), Box<dyn Error>> {
let mut config = VolumeConfig::read_from_file(config_path)?;
config.add_symlink_config(source.to_string(), symlink_path.to_string())?;
config.write_to_file(config_path)?;
println!("Added symlink from {} to {}", source, symlink_path);
Ok(())
}
pub fn remove_symlink(
config_path: &str,
source: &str,
symlink_path: &str,
) -> Result<(), Box<dyn Error>> {
let mut config = VolumeConfig::read_from_file(config_path)?;
config.remove_symlink(source, symlink_path)?;
config.write_to_file(config_path)?;
println!("Removed symlink from {} to {}", source, symlink_path);
Ok(())
}
pub fn list_symlinks(config_path: &str) -> Result<(), Box<dyn Error>> {
let config = VolumeConfig::read_from_file(config_path)?;
let all_symlinks = config.list_all_symlinks();
if all_symlinks.is_empty() {
println!("No symlinks found in the configuration.");
return Ok(());
}
println!("Symlinks in the configuration:");
for (i, (source, symlink_path)) in all_symlinks.iter().enumerate() {
println!("[{}] {} -> {}", i, source, symlink_path);
}
Ok(())
}
pub fn list_symlinks_for_source(config_path: &str, source: &str) -> Result<(), Box<dyn Error>> {
let config = VolumeConfig::read_from_file(config_path)?;
let symlinks = config.get_symlinks_for_source(source);
if symlinks.is_empty() {
println!("No symlinks found for source path: {}", source);
return Ok(());
}
println!("Symlinks for source path: {}", source);
for (i, symlink_path) in symlinks.iter().enumerate() {
println!("[{}] {}", i, symlink_path);
}
Ok(())
}
pub async fn execute_non_continuous_sync(
config_path: &str,
create_if_missing: bool,
) -> Result<(), Box<dyn Error>> {
println!(
"Executing one-time sync for non-continuous paths from: {}",
config_path
);
if !Path::new(config_path).exists() {
if create_if_missing {
create_empty_config(config_path)?;
} else {
return Err(format!("Config file not found: {}", config_path).into());
}
}
let config = VolumeConfig::read_from_file(config_path)?;
let once_paths: Vec<_> = config
.paths
.iter()
.filter(|path| !path.continuous)
.collect();
if once_paths.is_empty() {
println!("No non-continuous paths found in the configuration.");
return Ok(());
}
println!("Found {} non-continuous paths to sync", once_paths.len());
for (index, path) in once_paths.iter().enumerate() {
let sync_type = if path.driver.clone() == V1VolumeDriver::RCLONE_BISYNC {
"Bidirectional"
} else {
"Unidirectional"
};
println!(
"[{}/{}] {} syncing between {} and {}",
index + 1,
once_paths.len(),
sync_type,
path.source,
path.dest
);
let source = normalize_s3_path(&path.source);
let dest = normalize_s3_path(&path.dest);
ensure_path_exists(&source).await?;
ensure_path_exists(&dest).await?;
let mut cmd = Command::new("rclone");
let source = normalize_s3_path(&path.source);
let dest = normalize_s3_path(&path.dest);
if path.driver.clone() == V1VolumeDriver::RCLONE_BISYNC {
cmd.arg("bisync");
cmd.arg(&source);
cmd.arg(&dest);
cmd.arg("--resync");
cmd.arg("--force");
} else {
cmd.arg("sync");
cmd.arg(&source);
cmd.arg(&dest);
}
cmd.arg("--exclude").arg("__pycache__/**");
let output = cmd.output()?;
if output.status.success() {
println!("Successfully synced between {} and {}", source, dest);
if path.resync && path.driver.clone() == V1VolumeDriver::RCLONE_BISYNC {
if let Some(original_index) = config
.paths
.iter()
.position(|p| p.source == path.source && p.dest == path.dest)
{
let mut updated_config = VolumeConfig::read_from_file(config_path)?;
if original_index < updated_config.paths.len() {
updated_config.paths[original_index].resync = false;
updated_config.write_to_file(config_path)?;
}
}
}
} else {
let error = String::from_utf8_lossy(&output.stderr);
println!("Failed to sync: {}", error);
if error.contains("empty prior Path1 listing")
|| error.contains("Must run --resync to recover")
{
println!("Detected need for resync. Attempting to resync...");
let mut resync_cmd = Command::new("rclone");
resync_cmd.arg("bisync");
resync_cmd.arg(&source);
resync_cmd.arg(&dest);
resync_cmd.arg("--resync");
resync_cmd.arg("--force");
let resync_output = resync_cmd.output()?;
if resync_output.status.success() {
println!("Resync successful between {} and {}", source, dest);
} else {
let resync_error = String::from_utf8_lossy(&resync_output.stderr);
println!("Resync failed: {}", resync_error);
}
}
}
}
println!("Non-continuous sync operation completed");
Ok(())
}
pub fn has_overlapping_s3_bidirectional_sync(
config1: &VolumeConfig,
config2: &VolumeConfig,
) -> bool {
let config1_s3_paths: Vec<String> = config1
.paths
.iter()
.filter(|path| {
path.driver.clone() == V1VolumeDriver::RCLONE_BISYNC
&& (path.source.starts_with("s3://") || path.source.starts_with("s3:"))
})
.map(|path| normalize_s3_path(&path.source))
.collect();
let config2_s3_paths: Vec<String> = config2
.paths
.iter()
.filter(|path| {
path.driver.clone() == V1VolumeDriver::RCLONE_BISYNC
&& (path.source.starts_with("s3://") || path.source.starts_with("s3:"))
})
.map(|path| normalize_s3_path(&path.source))
.collect();
for path1 in &config1_s3_paths {
for path2 in &config2_s3_paths {
if path1 == path2 {
return true;
}
let path1_normalized = if path1.ends_with('/') {
path1.to_string()
} else {
format!("{}/", path1)
};
let path2_normalized = if path2.ends_with('/') {
path2.to_string()
} else {
format!("{}/", path2)
};
if path1_normalized.starts_with(&path2_normalized)
|| path2_normalized.starts_with(&path1_normalized)
{
return true;
}
}
}
false
}
pub async fn find_active_containers_with_overlapping_s3_sync(
db: &DatabaseConnection,
new_config: &VolumeConfig,
exclude_container_id: Option<&str>,
) -> Result<Vec<String>, DbErr> {
let all_containers = Query::find_all_containers(db).await?;
let mut overlapping_container_ids = Vec::new();
for container in all_containers {
if let Some(exclude_id) = exclude_container_id {
if container.id == exclude_id {
continue;
}
}
if let Some(status_json) = &container.status {
match serde_json::from_value::<V1ContainerStatus>(status_json.clone()) {
Ok(status) => {
let status_str = match status.status {
Some(s) => s.to_lowercase(),
None => continue, };
if ![
"running",
"queued",
"started",
"waiting",
"pending",
"suspended",
]
.contains(&status_str.as_str())
{
continue; }
}
Err(_) => continue, }
} else {
continue; }
if let Some(volumes_json) = &container.volumes {
match from_str::<VolumeConfig>(&volumes_json.to_string()) {
Ok(existing_config) => {
if has_overlapping_s3_bidirectional_sync(new_config, &existing_config) {
overlapping_container_ids.push(container.id.clone());
}
}
Err(_) => {
continue;
}
}
}
}
Ok(overlapping_container_ids)
}
pub async fn validate_no_overlapping_s3_syncs(
db: &DatabaseConnection,
new_config: &VolumeConfig,
exclude_container_id: Option<&str>,
) -> Result<(), DbErr> {
let overlapping_containers =
find_active_containers_with_overlapping_s3_sync(db, new_config, exclude_container_id)
.await?;
if overlapping_containers.is_empty() {
Ok(())
} else {
Err(DbErr::Custom(format!(
"The volume configuration has overlapping S3 bidirectional syncs with existing containers: {}",
overlapping_containers.join(", ")
)))
}
}
pub fn setup_rclone_config_from_env() -> Result<Option<std::fs::File>, Box<dyn Error>> {
let has_s3_env = std::env::var("RCLONE_CONFIG_S3REMOTE_TYPE").is_ok()
|| std::env::var("AWS_ACCESS_KEY_ID").is_ok();
if has_s3_env {
let temp_path = std::env::temp_dir().join("rclone_config.conf");
let mut temp_file = std::fs::File::create(&temp_path)?;
use std::io::Write;
writeln!(temp_file, "[s3]")?;
writeln!(temp_file, "type = s3")?;
if let Ok(region) = std::env::var("RCLONE_CONFIG_S3REMOTE_REGION") {
writeln!(temp_file, "region = {}", region)?;
} else if let Ok(region) = std::env::var("AWS_REGION") {
writeln!(temp_file, "region = {}", region)?;
}
if let Ok(provider) = std::env::var("RCLONE_CONFIG_S3REMOTE_PROVIDER") {
writeln!(temp_file, "provider = {}", provider)?;
}
if let Ok(env_auth) = std::env::var("RCLONE_CONFIG_S3REMOTE_ENV_AUTH") {
writeln!(temp_file, "env_auth = {}", env_auth)?;
} else {
if std::env::var("AWS_ACCESS_KEY_ID").is_ok()
&& std::env::var("AWS_SECRET_ACCESS_KEY").is_ok()
{
writeln!(temp_file, "env_auth = true")?;
}
}
std::env::set_var("RCLONE_CONFIG", temp_path.to_string_lossy().to_string());
println!("Created temporary rclone configuration from environment variables");
Ok(Some(temp_file))
} else {
Ok(None)
}
}
async fn create_s3_directory(path: &str) -> Result<(), Box<dyn Error>> {
let path = path.trim_start_matches("s3:");
let parts: Vec<&str> = path.splitn(2, '/').collect();
if parts.len() < 2 {
return Ok(());
}
let bucket = parts[0];
let prefix = parts[1];
let directory_prefix = if prefix.ends_with('/') {
prefix.to_string()
} else {
format!("{}/", prefix)
};
println!("Checking S3 directory: s3:{}/{}", bucket, directory_prefix);
let list_cmd = TokioCommand::new("rclone")
.arg("lsf")
.arg(format!("s3:{}/{}", bucket, directory_prefix))
.output()
.await?;
if list_cmd.status.success() {
return Ok(());
}
println!(
"Attempting to create S3 directory: s3:{}/{}",
bucket, directory_prefix
);
let temp_dir = tempfile::tempdir()?;
let temp_file_path = temp_dir.path().join(".rclone_directory_marker");
std::fs::write(&temp_file_path, "")?;
let create_cmd = TokioCommand::new("rclone")
.arg("copy")
.arg(&temp_file_path)
.arg(format!("s3:{}/{}", bucket, directory_prefix))
.output()
.await?;
if !create_cmd.status.success() {
let error = String::from_utf8_lossy(&create_cmd.stderr);
if error.contains("Forbidden") || error.contains("status code: 403") {
println!("Note: Unable to create S3 directory marker due to permissions (this may be expected): s3:{}/{}", bucket, directory_prefix);
return Ok(());
}
return Err(format!("Failed to create S3 directory: {}", error).into());
}
Ok(())
}
async fn ensure_path_exists(path: &str) -> Result<(), Box<dyn Error>> {
let normalized_path = normalize_s3_path(path);
if normalized_path.starts_with("s3:") {
create_s3_directory(&normalized_path).await?;
} else if !Path::new(&normalized_path).exists() {
println!("Creating local directory: {}", normalized_path);
fs::create_dir_all(&normalized_path)?;
}
Ok(())
}
pub async fn check_paths(source: &str, dest: &str) -> Result<bool, Box<dyn std::error::Error>> {
use std::process::Stdio;
use tokio::process::Command;
let config_path = match std::env::var("RCLONE_CONFIG") {
Ok(path) => path,
Err(_) => {
return Err(
"No RCLONE_CONFIG environment variable set - cannot proceed with rclone check!"
.into(),
);
}
};
let output = Command::new("rclone")
.arg("check")
.arg("--config")
.arg(&config_path)
.arg(source)
.arg(dest)
.arg("--one-way") .arg("--exclude")
.arg("__pycache__/**")
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.output()
.await?;
println!("stdout:\n{}", String::from_utf8_lossy(&output.stdout));
eprintln!("stderr:\n{}", String::from_utf8_lossy(&output.stderr));
Ok(output.status.success())
}