use super::{FileInfo, TransferResult, Transport};
use crate::error::{Result, SyncError};
use crate::sync::scanner::{FileEntry, ScanOptions};
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::ObjectStore;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::path::Path as ObjectPath;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;
pub struct GcsTransport {
store: Arc<dyn ObjectStore>,
prefix: String,
}
impl GcsTransport {
pub async fn new(bucket: String, prefix: String, _project: Option<String>) -> Result<Self> {
let builder = GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
let store = builder.build().map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to create GCS client: {}", e))))?;
Ok(Self { store: Arc::new(store), prefix })
}
fn path_to_object_path(&self, path: &Path) -> ObjectPath {
let path_str = path.to_string_lossy();
let path_str = path_str.trim_start_matches('/');
let key = if self.prefix.is_empty() {
path_str.to_string()
} else {
format!("{}/{}", self.prefix.trim_end_matches('/'), path_str)
};
ObjectPath::from(key)
}
fn object_path_to_path(&self, object_path: &ObjectPath) -> PathBuf {
let key = object_path.as_ref();
let key = if !self.prefix.is_empty() { key.strip_prefix(&self.prefix).unwrap_or(key).trim_start_matches('/') } else { key };
PathBuf::from(key)
}
}
#[async_trait]
impl Transport for GcsTransport {
fn set_scan_options(&mut self, _options: ScanOptions) {
}
async fn scan(&self, _path: &Path) -> Result<Vec<FileEntry>> {
use futures::stream::StreamExt;
let prefix = if self.prefix.is_empty() { None } else { Some(ObjectPath::from(self.prefix.clone())) };
let mut entries = Vec::new();
let mut list_stream = self.store.list(prefix.as_ref());
while let Some(meta) = list_stream.next().await {
let meta = meta.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to retrieve GCS object metadata: {}", e))))?;
let key = meta.location.as_ref();
let size = meta.size;
let modified = meta.last_modified.into();
let is_dir = key.ends_with('/');
entries.push(FileEntry {
path: Arc::new(PathBuf::from(key)),
relative_path: Arc::new(self.object_path_to_path(&meta.location)),
size,
modified,
is_dir,
is_symlink: false, symlink_target: None,
is_sparse: false,
allocated_size: size,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
});
}
Ok(entries)
}
async fn scan_streaming(&self, _path: &Path) -> Result<BoxStream<'static, Result<FileEntry>>> {
use futures::stream::StreamExt;
let prefix = if self.prefix.is_empty() { None } else { Some(ObjectPath::from(self.prefix.clone())) };
let prefix_str = self.prefix.clone();
let stream = self.store.list(prefix.as_ref());
let mapped = stream.map(move |meta_res| {
let meta = meta_res.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to retrieve GCS object metadata: {}", e))))?;
let key = meta.location.as_ref();
let size = meta.size;
let modified = meta.last_modified.into();
let is_dir = key.ends_with('/');
let relative_key = if !prefix_str.is_empty() { key.strip_prefix(&prefix_str).unwrap_or(key).trim_start_matches('/') } else { key };
let relative_path = Arc::new(PathBuf::from(relative_key));
Ok(FileEntry {
path: Arc::clone(&relative_path),
relative_path,
size,
modified,
is_dir,
is_symlink: false, symlink_target: None,
is_sparse: false,
allocated_size: size,
xattrs: None,
inode: None,
nlink: 1,
acls: None,
bsd_flags: None,
})
});
Ok(mapped.boxed())
}
async fn exists(&self, path: &Path) -> Result<bool> {
let object_path = self.path_to_object_path(path);
let result = self.store.head(&object_path).await;
Ok(result.is_ok())
}
async fn metadata(&self, _path: &Path) -> Result<std::fs::Metadata> {
Err(SyncError::Io(std::io::Error::other("metadata() not supported for GCS, use file_info() instead")))
}
async fn file_info(&self, path: &Path) -> Result<FileInfo> {
let object_path = self.path_to_object_path(path);
let meta = self
.store
.head(&object_path)
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to get GCS object metadata: {}", e))))?;
Ok(FileInfo { size: meta.size, modified: meta.last_modified.into() })
}
async fn create_dir_all(&self, path: &Path) -> Result<()> {
let mut key_str = self.path_to_object_path(path).to_string();
if !key_str.ends_with('/') {
key_str.push('/');
}
let object_path = ObjectPath::from(key_str);
self.store
.put(&object_path, Bytes::new().into())
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to create GCS directory marker: {}", e))))?;
Ok(())
}
async fn copy_file(&self, source: &Path, dest: &Path) -> Result<TransferResult> {
use tokio::io::AsyncReadExt;
let metadata = tokio::fs::metadata(source).await?;
let size = metadata.len();
let object_path = self.path_to_object_path(dest);
const MULTIPART_THRESHOLD: u64 = 5 * 1024 * 1024;
if size < MULTIPART_THRESHOLD {
let data = tokio::fs::read(source).await?;
self.store
.put(&object_path, Bytes::from(data).into())
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to upload to GCS: {}", e))))?;
} else {
use object_store::WriteMultipart;
let mut file = tokio::fs::File::open(source).await?;
let upload = self
.store
.put_multipart(&object_path)
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to initiate multipart upload: {}", e))))?;
let mut writer = WriteMultipart::new(upload);
const BUFFER_SIZE: usize = 5 * 1024 * 1024;
let mut buffer = vec![0u8; BUFFER_SIZE];
loop {
let bytes_read = file.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
writer.write(&buffer[..bytes_read]);
}
writer
.finish()
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to complete multipart upload: {}", e))))?;
}
Ok(TransferResult::new(size))
}
async fn remove(&self, path: &Path, _is_dir: bool) -> Result<()> {
let object_path = self.path_to_object_path(path);
self.store
.delete(&object_path)
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to delete GCS object: {}", e))))?;
Ok(())
}
async fn create_hardlink(&self, _source: &Path, _dest: &Path) -> Result<()> {
Err(SyncError::Io(std::io::Error::other("Hardlinks not supported on GCS")))
}
async fn create_symlink(&self, _target: &Path, _dest: &Path) -> Result<()> {
Err(SyncError::Io(std::io::Error::other("Symlinks not supported on GCS")))
}
async fn read_file(&self, path: &Path) -> Result<Vec<u8>> {
let object_path = self.path_to_object_path(path);
let result = self
.store
.get(&object_path)
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to download from GCS: {}", e))))?;
let bytes = result
.bytes()
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to read GCS object body: {}", e))))?;
Ok(bytes.to_vec())
}
async fn write_file(&self, path: &Path, data: &[u8], _mtime: SystemTime) -> Result<()> {
let object_path = self.path_to_object_path(path);
self.store
.put(&object_path, Bytes::copy_from_slice(data).into())
.await
.map_err(|e| SyncError::Io(std::io::Error::other(format!("Failed to upload to GCS: {}", e))))?;
Ok(())
}
async fn get_mtime(&self, path: &Path) -> Result<SystemTime> {
let info = self.file_info(path).await?;
Ok(info.modified)
}
}