use super::{
validate_relative_pattern, WorkspaceDirEntry, WorkspaceError, WorkspaceFileSystem,
WorkspaceFileSystemExt, WorkspaceFileType, WorkspaceGlobRequest, WorkspaceGlobResult,
WorkspaceGrepRequest, WorkspaceGrepResult, WorkspacePath, WorkspaceResult, WorkspaceSearch,
WorkspaceVersionConflict, WorkspaceWriteOutcome,
};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use aws_credential_types::Credentials;
use aws_sdk_s3::config::{BehaviorVersion, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
use aws_sdk_s3::operation::put_object::PutObjectError;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::Client;
use std::sync::Arc;
use std::time::Duration;
const DEFAULT_REGION: &str = "us-east-1";
pub const DEFAULT_MAX_READ_BYTES: u64 = 10 * 1024 * 1024;
pub const DEFAULT_MAX_OBJECTS_SCANNED: usize = 500;
pub const DEFAULT_MAX_GREP_BYTES_PER_OBJECT: u64 = 1024 * 1024;
pub const DEFAULT_SEARCH_CONCURRENCY: usize = 8;
#[derive(Debug, Clone)]
pub struct S3BackendConfig {
pub endpoint: Option<String>,
pub region: Option<String>,
pub access_key_id: String,
pub secret_access_key: String,
pub session_token: Option<String>,
pub bucket: String,
pub prefix: String,
pub force_path_style: bool,
pub request_timeout: Option<Duration>,
pub max_read_bytes: Option<u64>,
pub search_enabled: bool,
pub max_objects_scanned: Option<usize>,
pub max_grep_bytes_per_object: Option<u64>,
pub search_concurrency: Option<usize>,
}
impl S3BackendConfig {
pub fn new(
bucket: impl Into<String>,
prefix: impl Into<String>,
access_key_id: impl Into<String>,
secret_access_key: impl Into<String>,
) -> Self {
Self {
endpoint: None,
region: None,
access_key_id: access_key_id.into(),
secret_access_key: secret_access_key.into(),
session_token: None,
bucket: bucket.into(),
prefix: prefix.into(),
force_path_style: false,
request_timeout: None,
max_read_bytes: None,
search_enabled: false,
max_objects_scanned: None,
max_grep_bytes_per_object: None,
search_concurrency: None,
}
}
pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
pub fn region(mut self, region: impl Into<String>) -> Self {
self.region = Some(region.into());
self
}
pub fn session_token(mut self, token: impl Into<String>) -> Self {
self.session_token = Some(token.into());
self
}
pub fn force_path_style(mut self, enabled: bool) -> Self {
self.force_path_style = enabled;
self
}
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = Some(timeout);
self
}
pub fn max_read_bytes(mut self, bytes: u64) -> Self {
self.max_read_bytes = Some(bytes);
self
}
pub fn enable_search(mut self, enabled: bool) -> Self {
self.search_enabled = enabled;
self
}
pub fn max_objects_scanned(mut self, n: usize) -> Self {
self.max_objects_scanned = Some(n);
self
}
pub fn max_grep_bytes_per_object(mut self, bytes: u64) -> Self {
self.max_grep_bytes_per_object = Some(bytes);
self
}
pub fn search_concurrency(mut self, n: usize) -> Self {
self.search_concurrency = Some(n);
self
}
}
#[derive(Debug, Clone)]
pub struct S3WorkspaceBackend {
client: Client,
bucket: String,
prefix: String,
max_read_bytes: u64,
search_enabled: bool,
max_objects_scanned: usize,
max_grep_bytes_per_object: u64,
search_concurrency: usize,
}
impl S3WorkspaceBackend {
pub fn new(config: S3BackendConfig) -> Self {
let credentials = Credentials::new(
config.access_key_id,
config.secret_access_key,
config.session_token,
None,
"a3s-code-static",
);
let mut builder = aws_sdk_s3::Config::builder()
.behavior_version(BehaviorVersion::latest())
.region(Region::new(
config.region.unwrap_or_else(|| DEFAULT_REGION.to_string()),
))
.credentials_provider(credentials)
.force_path_style(config.force_path_style);
if let Some(endpoint) = config.endpoint {
builder = builder.endpoint_url(endpoint);
}
let client = Client::from_conf(builder.build());
Self::with_client(client, config.bucket, config.prefix)
.with_max_read_bytes(config.max_read_bytes.unwrap_or(DEFAULT_MAX_READ_BYTES))
.with_search_enabled(config.search_enabled)
.with_max_objects_scanned(
config
.max_objects_scanned
.unwrap_or(DEFAULT_MAX_OBJECTS_SCANNED),
)
.with_max_grep_bytes_per_object(
config
.max_grep_bytes_per_object
.unwrap_or(DEFAULT_MAX_GREP_BYTES_PER_OBJECT),
)
.with_search_concurrency(
config
.search_concurrency
.unwrap_or(DEFAULT_SEARCH_CONCURRENCY),
)
}
pub fn with_client(
client: Client,
bucket: impl Into<String>,
prefix: impl Into<String>,
) -> Self {
Self {
client,
bucket: bucket.into(),
prefix: normalize_prefix(&prefix.into()),
max_read_bytes: DEFAULT_MAX_READ_BYTES,
search_enabled: false,
max_objects_scanned: DEFAULT_MAX_OBJECTS_SCANNED,
max_grep_bytes_per_object: DEFAULT_MAX_GREP_BYTES_PER_OBJECT,
search_concurrency: DEFAULT_SEARCH_CONCURRENCY,
}
}
pub fn with_max_read_bytes(mut self, bytes: u64) -> Self {
self.max_read_bytes = if bytes == 0 {
DEFAULT_MAX_READ_BYTES
} else {
bytes
};
self
}
pub fn max_read_bytes(&self) -> u64 {
self.max_read_bytes
}
pub fn with_search_enabled(mut self, enabled: bool) -> Self {
self.search_enabled = enabled;
self
}
pub fn search_enabled(&self) -> bool {
self.search_enabled
}
pub fn with_max_objects_scanned(mut self, n: usize) -> Self {
self.max_objects_scanned = if n == 0 {
DEFAULT_MAX_OBJECTS_SCANNED
} else {
n
};
self
}
pub fn max_objects_scanned(&self) -> usize {
self.max_objects_scanned
}
pub fn with_max_grep_bytes_per_object(mut self, bytes: u64) -> Self {
self.max_grep_bytes_per_object = if bytes == 0 {
DEFAULT_MAX_GREP_BYTES_PER_OBJECT
} else {
bytes
};
self
}
pub fn max_grep_bytes_per_object(&self) -> u64 {
self.max_grep_bytes_per_object
}
pub fn with_search_concurrency(mut self, n: usize) -> Self {
self.search_concurrency = if n == 0 {
DEFAULT_SEARCH_CONCURRENCY
} else {
n
};
self
}
pub fn search_concurrency(&self) -> usize {
self.search_concurrency
}
pub fn bucket(&self) -> &str {
&self.bucket
}
pub fn prefix(&self) -> &str {
&self.prefix
}
pub fn client(&self) -> &Client {
&self.client
}
fn key_for(&self, path: &WorkspacePath) -> String {
if path.is_root() {
self.prefix.clone()
} else if self.prefix.is_empty() {
path.as_str().to_string()
} else {
format!("{}/{}", self.prefix, path.as_str())
}
}
fn list_prefix_for(&self, path: &WorkspacePath) -> String {
if path.is_root() {
if self.prefix.is_empty() {
String::new()
} else {
format!("{}/", self.prefix)
}
} else if self.prefix.is_empty() {
format!("{}/", path.as_str())
} else {
format!("{}/{}/", self.prefix, path.as_str())
}
}
async fn get_object_text(&self, path: &WorkspacePath) -> WorkspaceResult<(String, String)> {
let key = self.key_for(path);
let start = std::time::Instant::now();
let send_result = self
.client
.get_object()
.bucket(&self.bucket)
.key(&key)
.send()
.await;
emit_s3_call_event(
"s3.get_object",
&self.bucket,
&key,
send_result
.as_ref()
.ok()
.and_then(|r| r.content_length())
.unwrap_or(0)
.max(0) as u64,
send_result.is_ok(),
start.elapsed(),
);
let resp = send_result.map_err(|e| classify_get_error(&self.bucket, &key, e))?;
validate_content_length(
resp.content_length(),
self.max_read_bytes,
&self.bucket,
&key,
)?;
let etag = resp
.e_tag()
.map(|s| s.to_string())
.ok_or_else(|| {
anyhow!(
"S3 object s3://{}/{} returned no ETag; cannot use compare-and-swap writes against this endpoint",
self.bucket,
key
)
})?;
let bytes = resp
.body
.collect()
.await
.map_err(|e| {
anyhow!(
"Failed to read S3 object body s3://{}/{}: {}",
self.bucket,
key,
e
)
})?
.into_bytes();
let content = String::from_utf8(bytes.to_vec()).map_err(|e| {
anyhow!(
"S3 object s3://{}/{} is not valid UTF-8: {}",
self.bucket,
key,
e
)
})?;
Ok((content, etag))
}
}
#[async_trait]
impl WorkspaceFileSystem for S3WorkspaceBackend {
async fn read_text(&self, path: &WorkspacePath) -> WorkspaceResult<String> {
let (content, _etag) = self.get_object_text(path).await?;
Ok(content)
}
async fn write_text(
&self,
path: &WorkspacePath,
content: &str,
) -> WorkspaceResult<WorkspaceWriteOutcome> {
let key = self.key_for(path);
let body = ByteStream::from(content.as_bytes().to_vec());
let bytes = content.len() as u64;
let start = std::time::Instant::now();
let send_result = self
.client
.put_object()
.bucket(&self.bucket)
.key(&key)
.body(body)
.content_type("text/plain; charset=utf-8")
.send()
.await;
emit_s3_call_event(
"s3.put_object",
&self.bucket,
&key,
bytes,
send_result.is_ok(),
start.elapsed(),
);
send_result.map_err(|e| {
anyhow!(
"Failed to write S3 object s3://{}/{}: {}",
self.bucket,
key,
e
)
})?;
Ok(WorkspaceWriteOutcome {
bytes: content.len(),
lines: content.lines().count(),
})
}
async fn list_dir(&self, path: &WorkspacePath) -> WorkspaceResult<Vec<WorkspaceDirEntry>> {
let prefix = self.list_prefix_for(path);
let mut entries: Vec<WorkspaceDirEntry> = Vec::new();
let mut total_listed: usize = 0;
let mut continuation: Option<String> = None;
loop {
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(&prefix)
.delimiter("/");
if let Some(token) = continuation.as_ref() {
req = req.continuation_token(token);
}
let start = std::time::Instant::now();
let send_result = req.send().await;
emit_s3_call_event(
"s3.list_objects_v2",
&self.bucket,
&prefix,
send_result.as_ref().ok().map_or(0, |r| {
r.contents().len() as u64 + r.common_prefixes().len() as u64
}),
send_result.is_ok(),
start.elapsed(),
);
let resp = send_result.map_err(|e| classify_list_error(&self.bucket, &prefix, e))?;
for cp in resp.common_prefixes() {
total_listed += 1;
if let Some(p) = cp.prefix() {
if let Some(name) = strip_dir_name(p, &prefix) {
entries.push(WorkspaceDirEntry {
name,
kind: WorkspaceFileType::Directory,
size: 0,
});
}
}
}
for obj in resp.contents() {
total_listed += 1;
let Some(key) = obj.key() else { continue };
if key == prefix {
continue;
}
if let Some(name) = strip_file_name(key, &prefix) {
entries.push(WorkspaceDirEntry {
name,
kind: WorkspaceFileType::File,
size: obj.size().unwrap_or(0).max(0) as u64,
});
}
}
if resp.is_truncated().unwrap_or(false) {
continuation = resp.next_continuation_token().map(|s| s.to_string());
if continuation.is_none() {
break;
}
} else {
break;
}
}
if !path.is_root() && total_listed == 0 {
return Err(WorkspaceError::NotFound {
path: format!("s3://{}/{}", self.bucket, prefix.trim_end_matches('/')),
});
}
Ok(entries)
}
}
#[async_trait]
impl WorkspaceFileSystemExt for S3WorkspaceBackend {
async fn read_text_with_version(
&self,
path: &WorkspacePath,
) -> WorkspaceResult<(String, String)> {
self.get_object_text(path).await
}
async fn write_text_if_version(
&self,
path: &WorkspacePath,
content: &str,
expected_version: &str,
) -> WorkspaceResult<WorkspaceWriteOutcome> {
if expected_version.is_empty() {
return Err(WorkspaceError::InvalidArgument {
message:
"write_text_if_version requires a non-empty expected version (got empty); \
use write_text for unconditional writes"
.to_string(),
});
}
let key = self.key_for(path);
let body = ByteStream::from(content.as_bytes().to_vec());
let bytes = content.len() as u64;
let start = std::time::Instant::now();
let send_result = self
.client
.put_object()
.bucket(&self.bucket)
.key(&key)
.if_match(expected_version)
.body(body)
.content_type("text/plain; charset=utf-8")
.send()
.await;
emit_s3_call_event(
"s3.put_object_if_match",
&self.bucket,
&key,
bytes,
send_result.is_ok(),
start.elapsed(),
);
match send_result {
Ok(_) => Ok(WorkspaceWriteOutcome {
bytes: content.len(),
lines: content.lines().count(),
}),
Err(e) => Err(map_put_error(&self.bucket, &key, expected_version, e)),
}
}
}
impl S3WorkspaceBackend {
async fn list_recursive_under(
&self,
base: &WorkspacePath,
max_objects: usize,
) -> Result<(Vec<(String, u64)>, bool)> {
let prefix = self.list_prefix_for(base);
let mut entries: Vec<(String, u64)> = Vec::new();
let mut continuation: Option<String> = None;
let mut truncated = false;
loop {
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(&prefix);
if let Some(t) = continuation.as_ref() {
req = req.continuation_token(t);
}
let start = std::time::Instant::now();
let send_result = req.send().await;
emit_s3_call_event(
"s3.list_objects_v2_recursive",
&self.bucket,
&prefix,
send_result
.as_ref()
.ok()
.map_or(0, |r| r.contents().len() as u64),
send_result.is_ok(),
start.elapsed(),
);
let resp = send_result.map_err(|e| classify_list_error(&self.bucket, &prefix, e))?;
for obj in resp.contents() {
if entries.len() >= max_objects {
truncated = true;
return Ok((entries, truncated));
}
let Some(key) = obj.key() else { continue };
if key == prefix {
continue;
}
let Some(rel) = key.strip_prefix(&prefix) else {
continue;
};
if rel.is_empty() {
continue;
}
let size = obj.size().unwrap_or(0).max(0) as u64;
entries.push((rel.to_string(), size));
}
if resp.is_truncated().unwrap_or(false) {
continuation = resp.next_continuation_token().map(|s| s.to_string());
if continuation.is_none() {
break;
}
} else {
break;
}
}
Ok((entries, truncated))
}
}
#[async_trait]
impl WorkspaceSearch for S3WorkspaceBackend {
async fn glob(&self, request: WorkspaceGlobRequest) -> Result<WorkspaceGlobResult> {
validate_relative_pattern(&request.pattern, "glob pattern")?;
let pattern = glob::Pattern::new(&request.pattern)
.map_err(|e| anyhow!("Invalid glob pattern '{}': {}", request.pattern, e))?;
let recursive = request.pattern.contains("**");
let (entries, scan_truncated) = self
.list_recursive_under(&request.base, self.max_objects_scanned)
.await?;
if scan_truncated {
tracing::debug!(
"S3 glob scan truncated at {} objects under s3://{}/{}",
self.max_objects_scanned,
self.bucket,
self.list_prefix_for(&request.base)
);
}
let mut matches = Vec::new();
for (rel, _size) in entries {
if !recursive && rel.contains('/') {
continue;
}
if pattern.matches(&rel) {
matches.push(join_workspace_path(&request.base, &rel));
}
}
matches.sort_by(|a, b| a.as_str().cmp(b.as_str()));
Ok(WorkspaceGlobResult { matches })
}
async fn grep(&self, request: WorkspaceGrepRequest) -> Result<WorkspaceGrepResult> {
use futures::stream::StreamExt;
if let Some(ref g) = request.glob {
validate_relative_pattern(g, "grep glob filter")?;
}
let regex_pattern = if request.case_insensitive {
format!("(?i){}", request.pattern)
} else {
request.pattern.clone()
};
let regex = std::sync::Arc::new(
regex::Regex::new(®ex_pattern)
.map_err(|e| anyhow!("Invalid regex pattern '{}': {}", request.pattern, e))?,
);
let glob_filter = match request.glob.as_deref() {
Some(g) => Some((
glob::Pattern::new(g)
.map_err(|e| anyhow!("Invalid grep glob filter '{}': {}", g, e))?,
g.contains('/'),
)),
None => None,
};
let (entries, scan_truncated) = self
.list_recursive_under(&request.base, self.max_objects_scanned)
.await?;
let listing_prefix = self.list_prefix_for(&request.base);
let candidates: Vec<(WorkspacePath, String)> = entries
.into_iter()
.filter_map(|(rel, size)| {
if let Some((ref pat, has_sep)) = glob_filter {
let target = if has_sep {
rel.as_str()
} else {
basename(&rel)
};
if !pat.matches(target) {
return None;
}
}
if size > self.max_grep_bytes_per_object {
tracing::debug!(
"Skipping S3 object {}{} ({} bytes > grep cap {})",
listing_prefix,
rel,
size,
self.max_grep_bytes_per_object
);
return None;
}
let ws_path = join_workspace_path(&request.base, &rel);
let display_str = ws_path.as_str().to_string();
Some((ws_path, display_str))
})
.collect();
type FileMatch = (String, Vec<String>, Vec<usize>);
let regex_for_stream = std::sync::Arc::clone(®ex);
let listing_prefix_for_stream = listing_prefix.clone();
let per_file: Vec<Option<FileMatch>> = futures::stream::iter(candidates.into_iter())
.map(|(ws_path, display_str)| {
let regex = std::sync::Arc::clone(®ex_for_stream);
let listing_prefix = listing_prefix_for_stream.clone();
async move {
let content = match self.read_text(&ws_path).await {
Ok(c) => c,
Err(e) => {
tracing::debug!(
"Skipping S3 object {}{}: {}",
listing_prefix,
ws_path.as_str(),
e
);
return None;
}
};
let lines: Vec<String> = content.lines().map(|s| s.to_string()).collect();
let mut file_matches: Vec<usize> = Vec::new();
for (idx, line) in lines.iter().enumerate() {
if regex.is_match(line) {
file_matches.push(idx);
}
}
if file_matches.is_empty() {
None
} else {
Some((display_str, lines, file_matches))
}
}
})
.buffer_unordered(self.search_concurrency.max(1))
.collect()
.await;
let mut hits: Vec<FileMatch> = per_file.into_iter().flatten().collect();
hits.sort_by(|a, b| a.0.cmp(&b.0));
let mut output = String::new();
let mut match_count = 0usize;
let mut file_count = 0usize;
let mut total_size = 0usize;
let mut output_truncated = false;
'outer: for (display_str, lines, file_matches) in hits {
file_count += 1;
for &match_idx in &file_matches {
if total_size > request.max_output_size {
output_truncated = true;
break 'outer;
}
match_count += 1;
let start = match_idx.saturating_sub(request.context_lines);
let end = (match_idx + request.context_lines + 1).min(lines.len());
for (i, line) in lines[start..end].iter().enumerate() {
let abs_i = start + i;
let prefix = if abs_i == match_idx { ">" } else { " " };
let line = format!("{}{}:{}: {}\n", prefix, display_str, abs_i + 1, line);
total_size += line.len();
output.push_str(&line);
}
if request.context_lines > 0 {
output.push_str("--\n");
total_size += 3;
}
}
}
Ok(WorkspaceGrepResult {
output,
match_count,
file_count,
truncated: output_truncated || scan_truncated,
})
}
}
fn join_workspace_path(base: &WorkspacePath, rel: &str) -> WorkspacePath {
if base.is_root() {
WorkspacePath::from_normalized(rel)
} else {
WorkspacePath::from_normalized(format!("{}/{}", base.as_str(), rel))
}
}
fn basename(rel: &str) -> &str {
rel.rsplit_once('/').map_or(rel, |(_, b)| b)
}
fn validate_content_length(
advertised: Option<i64>,
max_bytes: u64,
bucket: &str,
key: &str,
) -> Result<()> {
match advertised {
Some(n) if n < 0 => Err(anyhow!(
"S3 object s3://{}/{} reported invalid content-length {}",
bucket,
key,
n
)),
Some(n) if (n as u64) > max_bytes => Err(anyhow!(
"S3 object s3://{}/{} is {} bytes, exceeds workspace max_read_bytes ({}); \
raise S3BackendConfig::max_read_bytes if the read is legitimate",
bucket,
key,
n,
max_bytes
)),
Some(_) => Ok(()),
None => Err(anyhow!(
"S3 object s3://{}/{} did not report Content-Length; refusing to read \
without a size guard. Check that the endpoint is S3-compliant.",
bucket,
key
)),
}
}
fn normalize_prefix(prefix: &str) -> String {
prefix
.trim_start_matches('/')
.trim_end_matches('/')
.to_string()
}
fn strip_dir_name(common_prefix: &str, listing_prefix: &str) -> Option<String> {
let remainder = common_prefix.strip_prefix(listing_prefix)?;
let trimmed = remainder.trim_end_matches('/');
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
fn strip_file_name(key: &str, listing_prefix: &str) -> Option<String> {
let remainder = key.strip_prefix(listing_prefix)?;
if remainder.is_empty() || remainder.contains('/') {
None
} else {
Some(remainder.to_string())
}
}
fn classify_get_error<E>(bucket: &str, key: &str, error: SdkError<E>) -> WorkspaceError
where
E: std::error::Error + Send + Sync + 'static,
{
let raw = error
.raw_response()
.map(|r| r.status().as_u16())
.unwrap_or_default();
if raw == 404 {
WorkspaceError::NotFound {
path: format!("s3://{}/{}", bucket, key),
}
} else {
WorkspaceError::Backend(anyhow!(
"Failed to read S3 object s3://{}/{}: {}",
bucket,
key,
error
))
}
}
fn classify_list_error(
bucket: &str,
prefix: &str,
error: SdkError<ListObjectsV2Error>,
) -> WorkspaceError {
WorkspaceError::Backend(anyhow!(
"Failed to list S3 prefix s3://{}/{}: {}",
bucket,
prefix,
error
))
}
fn emit_s3_call_event(
op: &'static str,
bucket: &str,
target: &str,
bytes: u64,
ok: bool,
elapsed: std::time::Duration,
) {
tracing::debug!(
op = op,
bucket = %bucket,
target = %target,
bytes = bytes,
outcome = if ok { "ok" } else { "error" },
duration_ms = elapsed.as_millis() as u64,
);
}
fn map_put_error(
bucket: &str,
key: &str,
expected_version: &str,
error: SdkError<PutObjectError>,
) -> WorkspaceError {
let status = error
.raw_response()
.map(|r| r.status().as_u16())
.unwrap_or_default();
if status == 412 {
WorkspaceError::VersionConflict(WorkspaceVersionConflict {
path: format!("s3://{}/{}", bucket, key),
expected: expected_version.to_string(),
actual: None,
})
} else {
WorkspaceError::Backend(anyhow!(
"Failed to write S3 object s3://{}/{}: {}",
bucket,
key,
error
))
}
}
impl super::WorkspaceServices {
pub fn s3(config: S3BackendConfig) -> Arc<Self> {
let backend = Arc::new(S3WorkspaceBackend::new(config));
Self::from_s3_backend(backend)
}
pub fn from_s3_backend(backend: Arc<S3WorkspaceBackend>) -> Arc<Self> {
let workspace_ref = super::WorkspaceRef::new(
format!("s3://{}/{}", backend.bucket(), backend.prefix()),
format!("s3://{}/{}", backend.bucket(), backend.prefix()),
);
let search_capable = backend.search_enabled();
let fs: Arc<dyn WorkspaceFileSystem> = backend.clone();
let fs_ext: Arc<dyn WorkspaceFileSystemExt> = backend.clone();
let mut builder = Self::builder(workspace_ref, fs)
.file_system_ext(fs_ext)
.operation_timeout(Duration::from_secs(60));
if search_capable {
let search: Arc<dyn WorkspaceSearch> = backend;
builder = builder.search(search);
}
builder.build()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn key_for_root_uses_prefix_only() {
let backend = make_backend("ws/u1/s1");
let key = backend.key_for(&WorkspacePath::root());
assert_eq!(key, "ws/u1/s1");
}
#[test]
fn key_for_nested_path_joins_with_slash() {
let backend = make_backend("ws/u1/s1");
let key = backend.key_for(&WorkspacePath::from_normalized("src/main.rs"));
assert_eq!(key, "ws/u1/s1/src/main.rs");
}
#[test]
fn key_for_empty_prefix_uses_path_only() {
let backend = make_backend("");
assert_eq!(
backend.key_for(&WorkspacePath::from_normalized("notes.txt")),
"notes.txt"
);
assert_eq!(backend.key_for(&WorkspacePath::root()), "");
}
#[test]
fn list_prefix_root_with_workspace_prefix() {
let backend = make_backend("ws/u1/s1");
assert_eq!(backend.list_prefix_for(&WorkspacePath::root()), "ws/u1/s1/");
}
#[test]
fn list_prefix_root_with_empty_workspace_prefix() {
let backend = make_backend("");
assert_eq!(backend.list_prefix_for(&WorkspacePath::root()), "");
}
#[test]
fn list_prefix_nested_path() {
let backend = make_backend("ws/u1/s1");
let path = WorkspacePath::from_normalized("src");
assert_eq!(backend.list_prefix_for(&path), "ws/u1/s1/src/");
}
#[test]
fn normalize_prefix_strips_slashes() {
assert_eq!(normalize_prefix("/foo/bar/"), "foo/bar");
assert_eq!(normalize_prefix("foo"), "foo");
assert_eq!(normalize_prefix(""), "");
assert_eq!(normalize_prefix("/"), "");
}
#[test]
fn strip_dir_name_extracts_immediate_child() {
assert_eq!(
strip_dir_name("ws/u1/s1/src/", "ws/u1/s1/"),
Some("src".to_string())
);
assert_eq!(strip_dir_name("ws/u1/s1/", "ws/u1/s1/"), None);
assert_eq!(strip_dir_name("other/", "ws/u1/s1/"), None);
}
#[test]
fn strip_file_name_rejects_nested_keys() {
assert_eq!(
strip_file_name("ws/u1/s1/notes.txt", "ws/u1/s1/"),
Some("notes.txt".to_string())
);
assert_eq!(strip_file_name("ws/u1/s1/src/main.rs", "ws/u1/s1/"), None);
assert_eq!(strip_file_name("other/notes.txt", "ws/u1/s1/"), None);
}
#[test]
fn config_builder_sets_fields() {
let cfg = S3BackendConfig::new("bucket", "prefix", "AK", "SK")
.endpoint("https://minio.local:9000")
.region("cn-east-1")
.session_token("TOKEN")
.force_path_style(true)
.request_timeout(Duration::from_secs(5))
.max_read_bytes(4096);
assert_eq!(cfg.bucket, "bucket");
assert_eq!(cfg.prefix, "prefix");
assert_eq!(cfg.endpoint.as_deref(), Some("https://minio.local:9000"));
assert_eq!(cfg.region.as_deref(), Some("cn-east-1"));
assert_eq!(cfg.session_token.as_deref(), Some("TOKEN"));
assert!(cfg.force_path_style);
assert_eq!(cfg.request_timeout, Some(Duration::from_secs(5)));
assert_eq!(cfg.max_read_bytes, Some(4096));
}
#[test]
fn config_default_max_read_bytes_is_none_until_set() {
let cfg = S3BackendConfig::new("bucket", "prefix", "AK", "SK");
assert!(cfg.max_read_bytes.is_none());
}
#[test]
fn backend_applies_default_max_read_bytes_when_config_omits_it() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK");
let backend = S3WorkspaceBackend::new(cfg);
assert_eq!(backend.max_read_bytes(), DEFAULT_MAX_READ_BYTES);
}
#[test]
fn backend_respects_config_max_read_bytes_override() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK").max_read_bytes(2048);
let backend = S3WorkspaceBackend::new(cfg);
assert_eq!(backend.max_read_bytes(), 2048);
}
#[test]
fn backend_treats_zero_max_read_bytes_as_default() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK").max_read_bytes(0);
let backend = S3WorkspaceBackend::new(cfg);
assert_eq!(backend.max_read_bytes(), DEFAULT_MAX_READ_BYTES);
}
#[test]
fn validate_content_length_allows_within_cap() {
assert!(validate_content_length(Some(1024), 4096, "bucket", "key").is_ok());
assert!(validate_content_length(Some(0), 4096, "bucket", "key").is_ok());
assert!(validate_content_length(Some(4096), 4096, "bucket", "key").is_ok());
}
#[test]
fn validate_content_length_rejects_over_cap() {
let err = validate_content_length(Some(4097), 4096, "bucket", "ws/big.txt").unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("exceeds workspace max_read_bytes"),
"msg: {msg}"
);
assert!(msg.contains("s3://bucket/ws/big.txt"), "msg: {msg}");
}
#[test]
fn validate_content_length_rejects_missing_header() {
let err = validate_content_length(None, 4096, "bucket", "ws/key").unwrap_err();
assert!(err.to_string().contains("did not report Content-Length"));
}
#[test]
fn validate_content_length_rejects_negative_length() {
let err = validate_content_length(Some(-1), 4096, "bucket", "ws/key").unwrap_err();
assert!(err.to_string().contains("invalid content-length"));
}
#[test]
fn services_s3_factory_disables_exec_search_and_git_by_default() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK");
let services = super::super::WorkspaceServices::s3(cfg);
let caps = services.capabilities();
assert!(caps.read);
assert!(caps.write);
assert!(!caps.exec);
assert!(!caps.search);
assert!(!caps.git);
assert!(services.command_runner().is_none());
assert!(services.search().is_none());
assert!(services.git().is_none());
assert!(services.git_stash().is_none());
assert!(services.git_worktree().is_none());
assert_eq!(services.operation_timeout(), Some(Duration::from_secs(60)));
}
#[test]
fn services_s3_factory_registers_search_when_enabled() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK").enable_search(true);
let services = super::super::WorkspaceServices::s3(cfg);
let caps = services.capabilities();
assert!(caps.search, "search capability must be on when enabled");
assert!(
services.search().is_some(),
"search provider must be wired when enabled"
);
assert!(!caps.exec);
assert!(!caps.git);
}
#[test]
fn config_search_defaults_off_until_enabled() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK");
assert!(!cfg.search_enabled);
assert!(cfg.max_objects_scanned.is_none());
assert!(cfg.max_grep_bytes_per_object.is_none());
let cfg = cfg
.enable_search(true)
.max_objects_scanned(50)
.max_grep_bytes_per_object(256 * 1024);
assert!(cfg.search_enabled);
assert_eq!(cfg.max_objects_scanned, Some(50));
assert_eq!(cfg.max_grep_bytes_per_object, Some(256 * 1024));
}
#[test]
fn backend_applies_search_defaults_when_config_omits_them() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK").enable_search(true);
let backend = S3WorkspaceBackend::new(cfg);
assert!(backend.search_enabled());
assert_eq!(backend.max_objects_scanned(), DEFAULT_MAX_OBJECTS_SCANNED);
assert_eq!(
backend.max_grep_bytes_per_object(),
DEFAULT_MAX_GREP_BYTES_PER_OBJECT
);
}
#[test]
fn backend_treats_zero_search_limits_as_defaults() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK")
.enable_search(true)
.max_objects_scanned(0)
.max_grep_bytes_per_object(0)
.search_concurrency(0);
let backend = S3WorkspaceBackend::new(cfg);
assert_eq!(backend.max_objects_scanned(), DEFAULT_MAX_OBJECTS_SCANNED);
assert_eq!(
backend.max_grep_bytes_per_object(),
DEFAULT_MAX_GREP_BYTES_PER_OBJECT
);
assert_eq!(backend.search_concurrency(), DEFAULT_SEARCH_CONCURRENCY);
}
#[test]
fn backend_applies_search_concurrency_default() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK").enable_search(true);
let backend = S3WorkspaceBackend::new(cfg);
assert_eq!(backend.search_concurrency(), DEFAULT_SEARCH_CONCURRENCY);
}
#[test]
fn backend_respects_search_concurrency_override() {
let cfg = S3BackendConfig::new("bucket", "ws", "AK", "SK")
.enable_search(true)
.search_concurrency(16);
let backend = S3WorkspaceBackend::new(cfg);
assert_eq!(backend.search_concurrency(), 16);
}
#[test]
fn join_workspace_path_handles_root_and_nested_bases() {
let root = WorkspacePath::root();
let joined = join_workspace_path(&root, "main.rs");
assert_eq!(joined.as_str(), "main.rs");
let base = WorkspacePath::from_normalized("src");
let joined = join_workspace_path(&base, "foo/main.rs");
assert_eq!(joined.as_str(), "src/foo/main.rs");
}
#[test]
fn basename_returns_last_segment() {
assert_eq!(basename("src/main.rs"), "main.rs");
assert_eq!(basename("main.rs"), "main.rs");
assert_eq!(basename("a/b/c/d.txt"), "d.txt");
}
#[test]
fn glob_pattern_matches_is_permissive_across_slashes() {
let permissive = glob::Pattern::new("*.rs").unwrap();
assert!(permissive.matches("main.rs"));
assert!(
permissive.matches("src/main.rs"),
"`glob` crate's `*` matches across `/`; if this ever changes, drop \
the manual `rel.contains('/')` guard in WorkspaceSearch::glob"
);
let recursive = glob::Pattern::new("**/*.rs").unwrap();
assert!(recursive.matches("src/main.rs"));
assert!(recursive.matches("main.rs"));
}
fn make_backend(prefix: &str) -> S3WorkspaceBackend {
let cfg = S3BackendConfig::new("bucket", prefix, "AK", "SK");
S3WorkspaceBackend::new(cfg)
}
}