use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use shedul3r_rs_sdk::TaskPayload;
use crate::auth::{merge_env, Auth};
use crate::bundle::Bundle;
use crate::error::PipelineError;
use crate::executor::{extract_step_name, Executor};
use crate::model::{Model, Tool};
use crate::pool::run_pool;
use crate::task::{build_task_yaml, TaskConfig};
#[derive(Debug, Clone)]
pub struct AgentResult {
pub success: bool,
pub output: String,
}
impl AgentResult {
pub fn require_success(&self) -> Result<&Self, PipelineError> {
if self.success {
Ok(self)
} else {
Err(PipelineError::AgentFailed {
message: self.output.clone(),
})
}
}
}
#[derive(Debug, Clone, Default)]
#[must_use]
pub struct AgentTask {
pub(crate) prompt: Option<String>,
pub(crate) working_dir: Option<PathBuf>,
pub(crate) expected_output: Option<PathBuf>,
pub(crate) bundle_data: Option<Bundle>,
pub(crate) auth: Option<Auth>,
}
impl AgentTask {
pub fn new() -> Self {
Self::default()
}
pub fn prompt(mut self, text: &str) -> Self {
self.prompt = Some(String::from(text));
self
}
pub fn working_dir(mut self, path: &Path) -> Self {
self.working_dir = Some(path.to_path_buf());
self
}
pub fn expected_output(mut self, path: &Path) -> Self {
self.expected_output = Some(path.to_path_buf());
self
}
pub fn bundle(mut self, bundle: Bundle) -> Self {
self.bundle_data = Some(bundle);
self
}
pub fn auth(mut self, auth: Auth) -> Self {
self.auth = Some(auth);
self
}
}
#[must_use]
pub struct AgentBuilder<'a> {
executor: &'a Executor,
name: String,
auth: Option<&'a Auth>,
model: Option<Model>,
timeout: Option<Duration>,
tools: Option<String>,
prompt: Option<String>,
working_dir: Option<PathBuf>,
expected_output: Option<PathBuf>,
bundle_data: Option<Bundle>,
}
impl<'a> AgentBuilder<'a> {
pub(crate) fn new(executor: &'a Executor, name: &str) -> Self {
Self {
executor,
name: String::from(name),
auth: None,
model: None,
timeout: None,
tools: None,
prompt: None,
working_dir: None,
expected_output: None,
bundle_data: None,
}
}
pub const fn auth(mut self, auth: &'a Auth) -> Self {
self.auth = Some(auth);
self
}
pub fn model(mut self, model: Model) -> Self {
self.model = Some(model);
self
}
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn tools(mut self, tools: &[Tool]) -> Self {
let joined: String = tools
.iter()
.enumerate()
.fold(String::new(), |mut acc, (i, t)| {
if i > 0 {
acc.push(',');
}
acc.push_str(t.as_str());
acc
});
self.tools = Some(joined);
self
}
pub fn prompt(mut self, text: &str) -> Self {
self.prompt = Some(String::from(text));
self
}
pub fn working_dir(mut self, path: &Path) -> Self {
self.working_dir = Some(path.to_path_buf());
self
}
pub fn expected_output(mut self, path: &Path) -> Self {
self.expected_output = Some(path.to_path_buf());
self
}
pub fn bundle(mut self, bundle: Bundle) -> Self {
self.bundle_data = Some(bundle);
self
}
pub fn items<T>(self, items: Vec<T>, concurrency: usize) -> AgentBatchBuilder<'a, T> {
AgentBatchBuilder {
executor: self.executor,
name: self.name,
auth: self.auth,
model: self.model,
timeout: self.timeout,
tools: self.tools,
items,
concurrency,
mapper: None,
}
}
fn resolve_model_string(&self) -> Option<String> {
self.model.as_ref().map(|m| {
let provider = self
.executor
.default_provider()
.cloned()
.unwrap_or_default();
self.executor.model_config().resolve(m, &provider)
})
}
pub async fn execute(self) -> Result<AgentResult, PipelineError> {
let model_str = self.resolve_model_string();
let timeout_str = self.timeout.map(format_duration);
let prompt = self
.prompt
.ok_or_else(|| PipelineError::Config(String::from("agent prompt is required")))?;
let task_yaml = build_task_yaml(&TaskConfig {
name: self.name.clone(),
model: model_str,
timeout: timeout_str,
provider_id: None,
max_concurrent: None,
max_wait: None,
max_retries: None,
allowed_tools: self.tools,
})?;
let auth = self.auth.or_else(|| self.executor.default_auth());
let auth_env = auth
.map(Auth::to_env)
.transpose()?
.unwrap_or_default();
let env = merge_env(auth_env, None);
if let Some(dry_run_mutex) = self.executor.dry_run_config() {
return execute_dry_run_capture(
dry_run_mutex,
&task_yaml,
&prompt,
self.expected_output.as_deref(),
self.working_dir.as_deref(),
env.as_ref(),
self.bundle_data.as_ref(),
);
}
execute_remote_bundle(
self.executor.sdk_client(),
self.executor.is_remote(),
self.bundle_data.as_ref(),
&task_yaml,
&prompt,
self.working_dir.as_deref(),
self.expected_output.as_deref(),
env,
)
.await
}
}
#[must_use]
pub struct AgentBatchBuilder<'a, T> {
executor: &'a Executor,
name: String,
auth: Option<&'a Auth>,
model: Option<Model>,
timeout: Option<Duration>,
tools: Option<String>,
items: Vec<T>,
concurrency: usize,
mapper: Option<Box<dyn Fn(T) -> AgentTask + Send + Sync>>,
}
type BatchResultStore = Arc<Mutex<Vec<Option<Result<AgentResult, PipelineError>>>>>;
#[derive(Clone)]
struct BatchConfig {
name: String,
model: Option<String>,
timeout: Option<String>,
tools: Option<String>,
default_auth_env: BTreeMap<String, String>,
}
impl<T: Send + 'static> AgentBatchBuilder<'_, T> {
pub fn for_each<F>(mut self, f: F) -> Self
where
F: Fn(T) -> AgentTask + Send + Sync + 'static,
{
self.mapper = Some(Box::new(f));
self
}
fn resolve_model_string(&self) -> Option<String> {
self.model.as_ref().map(|m| {
let provider = self
.executor
.default_provider()
.cloned()
.unwrap_or_default();
self.executor.model_config().resolve(m, &provider)
})
}
pub async fn execute(self) -> Result<Vec<Result<AgentResult, PipelineError>>, PipelineError> {
let model_str = self.resolve_model_string();
let timeout_str = self.timeout.map(format_duration);
let mapper = self
.mapper
.ok_or_else(|| PipelineError::Config(String::from("batch requires a for_each mapper")))?;
let default_auth = self.auth.or_else(|| self.executor.default_auth());
let default_auth_env = default_auth
.map(Auth::to_env)
.transpose()?
.unwrap_or_default();
let config = BatchConfig {
name: self.name.clone(),
model: model_str,
timeout: timeout_str,
tools: self.tools.clone(),
default_auth_env,
};
let total = self.items.len();
let tasks: Vec<(AgentTask, BatchConfig)> = self
.items
.into_iter()
.map(|item| (mapper(item), config.clone()))
.collect();
if let Some(dry_run_mutex) = self.executor.dry_run_config() {
let mut results = Vec::with_capacity(total);
for (task, cfg) in &tasks {
let result = execute_batch_task_dry_run(task, cfg, dry_run_mutex)?;
results.push(Ok(result));
}
return Ok(results);
}
let results_store: BatchResultStore =
Arc::new(Mutex::new((0..total).map(|_| None).collect()));
let client = self.executor.sdk_client().clone();
let remote = self.executor.is_remote();
let results_for_pool = Arc::clone(&results_store);
let _pool_outcomes = run_pool(tasks, self.concurrency, move |pair, index| {
let client = client.clone();
let store = Arc::clone(&results_for_pool);
async move {
let (task, cfg) = pair;
let result = execute_single_task(&task, &cfg, &client, remote).await;
{
let mut guard = store
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(slot) = guard.get_mut(index) {
*slot = Some(result);
}
}
Ok(())
}
})
.await;
let inner = Arc::try_unwrap(results_store)
.map_err(|_| PipelineError::Other(String::from("batch results Arc still shared after pool completion")))?
.into_inner()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let results: Vec<Result<AgentResult, PipelineError>> = inner
.into_iter()
.map(|opt| {
opt.unwrap_or_else(|| {
Err(PipelineError::AgentFailed {
message: String::from("batch task result missing"),
})
})
})
.collect();
let (succeeded, failed) = count_batch_outcomes(&results);
if is_partial_failure(succeeded, failed) {
tracing::warn!(
"Batch partial failure: {succeeded} succeeded, {failed} failed out of {total}"
);
}
Ok(results)
}
}
fn count_batch_outcomes<T, E>(results: &[Result<T, E>]) -> (usize, usize) {
let mut succeeded: usize = 0;
let mut failed: usize = 0;
for r in results {
if r.is_ok() {
succeeded = succeeded.saturating_add(1);
} else {
failed = failed.saturating_add(1);
}
}
(succeeded, failed)
}
const fn is_partial_failure(succeeded: usize, failed: usize) -> bool {
failed > 0 && succeeded > 0
}
fn execute_dry_run_capture(
dry_run_mutex: &std::sync::Mutex<crate::executor::DryRunConfig>,
task_yaml: &str,
prompt: &str,
expected_output: Option<&Path>,
working_dir: Option<&Path>,
env: Option<&BTreeMap<String, String>>,
bundle: Option<&Bundle>,
) -> Result<AgentResult, PipelineError> {
let mut guard = dry_run_mutex
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let step_name = extract_step_name(task_yaml);
let index = guard.counter;
guard.counter = guard.counter.saturating_add(1);
let capture_dir = guard.base_dir.join(&step_name).join(index.to_string());
drop(guard);
std::fs::create_dir_all(&capture_dir)?;
std::fs::write(capture_dir.join("prompt.md"), prompt)?;
std::fs::write(capture_dir.join("task.yaml"), task_yaml)?;
let env_keys: Vec<&str> = env
.map(|m| m.keys().map(String::as_str).collect())
.unwrap_or_default();
let bundle_files: Vec<&str> = bundle
.map(|b| b.files().iter().map(|(name, _)| name.as_str()).collect())
.unwrap_or_default();
let meta = serde_json::json!({
"expectedOutput": expected_output.map(|p| p.display().to_string()),
"workingDirectory": working_dir.map(|p| p.display().to_string()),
"environment": env_keys,
"bundleFiles": bundle_files,
});
std::fs::write(
capture_dir.join("meta.json"),
serde_json::to_string_pretty(&meta).map_err(|e| {
PipelineError::Other(format!("failed to serialize meta: {e}"))
})?,
)?;
tracing::info!("[dry-run] Captured to {}", capture_dir.display());
Ok(AgentResult {
success: true,
output: String::from("(dry-run)"),
})
}
fn execute_batch_task_dry_run(
task: &AgentTask,
config: &BatchConfig,
dry_run_mutex: &std::sync::Mutex<crate::executor::DryRunConfig>,
) -> Result<AgentResult, PipelineError> {
let prompt = task
.prompt
.as_ref()
.ok_or_else(|| PipelineError::Config(String::from("agent task prompt is required")))?;
let task_yaml = build_task_yaml(&TaskConfig {
name: config.name.clone(),
model: config.model.clone(),
timeout: config.timeout.clone(),
provider_id: None,
max_concurrent: None,
max_wait: None,
max_retries: None,
allowed_tools: config.tools.clone(),
})?;
let auth_env = if let Some(ref auth) = task.auth {
Some(auth.to_env()?)
} else if config.default_auth_env.is_empty() {
None
} else {
Some(config.default_auth_env.clone())
};
execute_dry_run_capture(
dry_run_mutex,
&task_yaml,
prompt,
task.expected_output.as_deref(),
task.working_dir.as_deref(),
auth_env.as_ref(),
task.bundle_data.as_ref(),
)
}
#[allow(clippy::too_many_arguments)] async fn execute_remote_bundle(
client: &shedul3r_rs_sdk::Client,
remote: bool,
bundle: Option<&Bundle>,
task_yaml: &str,
prompt: &str,
working_dir: Option<&Path>,
expected_output: Option<&Path>,
env: Option<BTreeMap<String, String>>,
) -> Result<AgentResult, PipelineError> {
let bundle_handle = if remote {
if let Some(bundle) = bundle {
let file_refs: Vec<(&str, &[u8])> = bundle
.files()
.iter()
.map(|(name, content)| (name.as_str(), content.as_slice()))
.collect();
Some(client.upload_bundle(&file_refs).await?)
} else {
None
}
} else {
None
};
let working_directory = if let Some(ref handle) = bundle_handle {
Some(handle.remote_path.clone())
} else {
working_dir.map(|p| p.display().to_string())
};
let payload = TaskPayload {
task: String::from(task_yaml),
input: String::from(prompt),
working_directory,
environment: env,
limiter_key: None,
timeout_ms: None,
};
let execution_result = async {
let result = if let Some(expected) = expected_output {
client
.submit_task_with_recovery(&payload, expected)
.await?
} else {
client.submit_task(&payload).await?
};
if let Some(ref handle) = bundle_handle {
if let Some(bundle) = bundle {
for output_path in bundle.expected_output_paths() {
let bytes = client
.download_file(&handle.id, output_path)
.await?;
let local_dir = working_dir
.map_or_else(std::env::temp_dir, std::path::Path::to_path_buf);
let local_path = local_dir.join(output_path);
if let Some(parent) = local_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&local_path, &bytes).await?;
}
}
}
Ok::<AgentResult, PipelineError>(AgentResult {
success: result.success,
output: result.output,
})
}
.await;
if let Some(ref handle) = bundle_handle {
if let Err(e) = client.delete_bundle(&handle.id).await {
tracing::warn!("failed to delete remote bundle {}: {e}", handle.id);
}
}
execution_result
}
async fn execute_single_task(
task: &AgentTask,
config: &BatchConfig,
client: &shedul3r_rs_sdk::Client,
remote: bool,
) -> Result<AgentResult, PipelineError> {
let prompt = task
.prompt
.as_ref()
.ok_or_else(|| PipelineError::Config(String::from("agent task prompt is required")))?;
let task_yaml = build_task_yaml(&TaskConfig {
name: config.name.clone(),
model: config.model.clone(),
timeout: config.timeout.clone(),
provider_id: None,
max_concurrent: None,
max_wait: None,
max_retries: None,
allowed_tools: config.tools.clone(),
})?;
let auth_env = if let Some(ref auth) = task.auth {
auth.to_env()?
} else {
config.default_auth_env.clone()
};
let env = merge_env(auth_env, None);
execute_remote_bundle(
client,
remote,
task.bundle_data.as_ref(),
&task_yaml,
prompt,
task.working_dir.as_deref(),
task.expected_output.as_deref(),
env,
)
.await
}
fn format_duration(d: Duration) -> String {
let total_secs = d.as_secs();
let hours = total_secs.checked_div(3600).unwrap_or(0);
let remaining = total_secs.saturating_sub(hours.saturating_mul(3600));
let minutes = remaining.checked_div(60).unwrap_or(0);
let seconds = remaining.saturating_sub(minutes.saturating_mul(60));
if hours > 0 {
if minutes > 0 {
format!("{hours}h{minutes}m")
} else {
format!("{hours}h")
}
} else if minutes > 0 {
if seconds > 0 {
format!("{minutes}m{seconds}s")
} else {
format!("{minutes}m")
}
} else {
format!("{seconds}s")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn agent_result_require_success_ok() {
let result = AgentResult {
success: true,
output: String::from("done"),
};
assert!(
result.require_success().is_ok(),
"should return Ok for successful agent"
);
}
#[test]
#[allow(clippy::unwrap_used)] fn agent_result_require_success_err() {
let result = AgentResult {
success: false,
output: String::from("timeout exceeded"),
};
let err = result.require_success();
assert!(err.is_err(), "should return Err for failed agent");
let msg = err.unwrap_err().to_string();
assert!(
msg.contains("timeout exceeded"),
"error should contain output: {msg}"
);
}
#[test]
fn format_duration_minutes() {
assert_eq!(
format_duration(Duration::from_secs(900)),
"15m",
"15 minutes"
);
}
#[test]
fn format_duration_hours_and_minutes() {
assert_eq!(
format_duration(Duration::from_secs(5400)),
"1h30m",
"1 hour 30 minutes"
);
}
#[test]
fn format_duration_seconds_only() {
assert_eq!(
format_duration(Duration::from_secs(45)),
"45s",
"45 seconds"
);
}
#[test]
fn format_duration_zero() {
assert_eq!(format_duration(Duration::from_secs(0)), "0s", "zero");
}
#[test]
fn format_duration_exact_hour() {
assert_eq!(
format_duration(Duration::from_secs(3600)),
"1h",
"exact hour"
);
}
#[test]
fn agent_task_builder_chain() {
let task = AgentTask::new()
.prompt("hello")
.working_dir(Path::new("/tmp"))
.expected_output(Path::new("/tmp/out.txt"))
.auth(Auth::ApiKey(String::from("sk-test")));
assert_eq!(
task.prompt.as_deref(),
Some("hello"),
"prompt should be set"
);
assert_eq!(
task.working_dir.as_deref(),
Some(Path::new("/tmp")),
"working_dir should be set"
);
assert_eq!(
task.expected_output.as_deref(),
Some(Path::new("/tmp/out.txt")),
"expected_output should be set"
);
assert!(task.auth.is_some(), "auth should be set");
}
#[test]
fn agent_task_default_is_empty() {
let task = AgentTask::new();
assert!(task.prompt.is_none(), "prompt should default to None");
assert!(
task.working_dir.is_none(),
"working_dir should default to None"
);
assert!(
task.expected_output.is_none(),
"expected_output should default to None"
);
assert!(task.bundle_data.is_none(), "bundle should default to None");
assert!(task.auth.is_none(), "auth should default to None");
}
#[tokio::test]
#[allow(clippy::unwrap_used)] async fn batch_dry_run_produces_correct_count() {
let executor = Executor::with_defaults()
.unwrap()
.with_dry_run(PathBuf::from("/tmp/pipelin3r-batch-test"));
let items: Vec<String> = vec![
String::from("item_a"),
String::from("item_b"),
String::from("item_c"),
];
let results = executor
.agent("test-batch")
.model(Model::Sonnet4_6)
.items(items, 2)
.for_each(|item| AgentTask::new().prompt(&format!("process {item}")))
.execute()
.await
.unwrap();
assert_eq!(results.len(), 3, "should produce one result per item");
for (i, r) in results.iter().enumerate() {
assert!(r.is_ok(), "item {i} should succeed in dry-run");
}
let _ = std::fs::remove_dir_all("/tmp/pipelin3r-batch-test");
}
#[test]
fn regression_require_success_returns_agent_failed_not_other() {
let result = AgentResult {
success: false,
output: String::from("model timeout"),
};
let err = result.require_success();
assert!(err.is_err(), "failed agent must return Err");
assert!(
matches!(&err, Err(PipelineError::AgentFailed { message }) if message == "model timeout"),
"must be PipelineError::AgentFailed with preserved message, got: {err:?}"
);
}
#[test]
fn mutant_kill_agent_task_bundle_preserves_bundle() {
let bundle = Bundle::new()
.add_text_file("test.txt", "content")
.unwrap_or_else(|_| Bundle::new());
let task = AgentTask::new().bundle(bundle);
assert!(
task.bundle_data.is_some(),
"bundle_data must be Some after calling .bundle(), not Default::default()"
);
let b = task.bundle_data.as_ref().unwrap_or_else(|| std::process::abort());
assert_eq!(
b.file_count(),
1,
"bundle must contain the file that was added"
);
}
#[test]
#[allow(clippy::unwrap_used)] fn mutant_kill_tools_empty_check() {
let executor = Executor::with_defaults().unwrap()
.with_dry_run(PathBuf::from("/tmp/pipelin3r-tools-test"));
let builder_empty = executor.agent("test-tools-empty")
.tools(&[]);
assert_eq!(
builder_empty.tools.as_deref(),
Some(""),
"empty tools slice should produce empty string"
);
let builder_two = executor.agent("test-tools-two")
.tools(&[Tool::Read, Tool::Write]);
assert_eq!(
builder_two.tools.as_deref(),
Some("Read,Write"),
"two tools should be comma-separated without leading comma"
);
let builder_one = executor.agent("test-tools-one")
.tools(&[Tool::Grep]);
assert_eq!(
builder_one.tools.as_deref(),
Some("Grep"),
"single tool should have no comma"
);
let _ = std::fs::remove_dir_all("/tmp/pipelin3r-tools-test");
}
#[tokio::test]
#[allow(clippy::unwrap_used)] async fn mutant_kill_resolve_model_string_returns_correct_id() {
let executor = Executor::with_defaults().unwrap()
.with_dry_run(PathBuf::from("/tmp/pipelin3r-model-test"));
let result = executor.agent("test-model")
.model(Model::Opus4_6)
.prompt("test prompt")
.execute()
.await
.unwrap();
assert!(result.success, "dry-run should succeed");
let task_yaml = std::fs::read_to_string("/tmp/pipelin3r-model-test/test-model/0/task.yaml")
.unwrap();
assert!(
task_yaml.contains("claude-opus-4-6"),
"task YAML must contain the resolved model ID 'claude-opus-4-6', got: {task_yaml}"
);
let _ = std::fs::remove_dir_all("/tmp/pipelin3r-model-test");
}
#[tokio::test]
#[allow(clippy::unwrap_used)] async fn mutant_kill_batch_partial_failure_counts() {
let executor = Executor::with_defaults().unwrap()
.with_dry_run(PathBuf::from("/tmp/pipelin3r-batch-partial"));
let items = vec![String::from("a"), String::from("b"), String::from("c")];
let results = executor
.agent("test-partial")
.model(Model::Sonnet4_6)
.items(items, 2)
.for_each(|item| AgentTask::new().prompt(&format!("do {item}")))
.execute()
.await
.unwrap();
assert_eq!(results.len(), 3, "should have 3 results");
let mut succeeded: usize = 0;
let mut failed: usize = 0;
for r in &results {
if r.is_ok() {
succeeded = succeeded.saturating_add(1);
} else {
failed = failed.saturating_add(1);
}
}
assert_eq!(succeeded, 3, "all 3 dry-run tasks should succeed");
assert_eq!(failed, 0, "no dry-run tasks should fail");
let all_success = failed > 0 && succeeded > 0;
assert!(
!all_success,
"when all tasks succeed, partial failure check must be false"
);
let sim_succeeded: usize = 2;
let sim_failed: usize = 1;
let partial = sim_failed > 0 && sim_succeeded > 0;
assert!(
partial,
"when some fail and some succeed, partial failure check must be true"
);
let all_fail_succeeded: usize = 0;
let all_fail_failed: usize = 3;
let all_failed = all_fail_failed > 0 && all_fail_succeeded > 0;
assert!(
!all_failed,
"when all tasks fail (succeeded=0), partial failure check must be false"
);
let _ = std::fs::remove_dir_all("/tmp/pipelin3r-batch-partial");
}
#[test]
fn mutant_kill_format_duration_zero_vs_nonzero() {
assert_eq!(
format_duration(Duration::ZERO),
"0s",
"zero duration must format as '0s'"
);
assert_eq!(
format_duration(Duration::from_secs(5)),
"5s",
"5 seconds must format as '5s'"
);
assert_eq!(
format_duration(Duration::from_secs(60)),
"1m",
"60 seconds must format as '1m'"
);
assert_eq!(
format_duration(Duration::from_secs(61)),
"1m1s",
"61 seconds must format as '1m1s'"
);
assert_eq!(
format_duration(Duration::from_secs(3600)),
"1h",
"3600 seconds must format as '1h'"
);
assert_eq!(
format_duration(Duration::from_secs(3601)),
"1h",
"3601 seconds formats as '1h' (seconds dropped in hour mode)"
);
assert_eq!(
format_duration(Duration::from_secs(3660)),
"1h1m",
"3660 seconds must format as '1h1m'"
);
}
#[tokio::test]
async fn batch_without_mapper_fails() {
let executor = Executor::with_defaults().unwrap_or_else(|_| {
Executor::new(&shedul3r_rs_sdk::ClientConfig::default())
.unwrap_or_else(|_| std::process::abort())
});
let items: Vec<u32> = vec![1, 2];
let result = executor
.agent("test")
.items(items, 1)
.execute()
.await;
assert!(
result.is_err(),
"should fail without for_each mapper"
);
}
#[test]
fn mutant_kill_v2_count_batch_outcomes_all_success() {
let results: Vec<Result<&str, &str>> = vec![Ok("a"), Ok("b"), Ok("c")];
let (succeeded, failed) = count_batch_outcomes(&results);
assert_eq!(succeeded, 3, "all Ok results must count as succeeded");
assert_eq!(failed, 0, "no Err results means failed=0");
assert!(
!is_partial_failure(succeeded, failed),
"all-success (3,0) must NOT be partial failure"
);
}
#[test]
fn mutant_kill_v2_count_batch_outcomes_all_failed() {
let results: Vec<Result<&str, &str>> = vec![Err("x"), Err("y"), Err("z")];
let (succeeded, failed) = count_batch_outcomes(&results);
assert_eq!(succeeded, 0, "no Ok results means succeeded=0");
assert_eq!(failed, 3, "all Err results must count as failed");
assert!(
!is_partial_failure(succeeded, failed),
"all-failed (0,3) must NOT be partial failure"
);
}
#[test]
fn mutant_kill_v2_count_batch_outcomes_partial_failure() {
let results: Vec<Result<&str, &str>> = vec![Ok("a"), Err("x"), Ok("b")];
let (succeeded, failed) = count_batch_outcomes(&results);
assert_eq!(succeeded, 2, "two Ok results");
assert_eq!(failed, 1, "one Err result");
assert!(
is_partial_failure(succeeded, failed),
"mixed (2,1) must be partial failure"
);
}
#[test]
fn mutant_kill_v2_count_batch_outcomes_empty() {
let results: Vec<Result<&str, &str>> = vec![];
let (succeeded, failed) = count_batch_outcomes(&results);
assert_eq!(succeeded, 0, "empty batch has 0 succeeded");
assert_eq!(failed, 0, "empty batch has 0 failed");
assert!(
!is_partial_failure(succeeded, failed),
"empty (0,0) must NOT be partial failure"
);
}
}