use crate::Result;
use crate::cli::MatchArgs;
use crate::cli::display_match_results;
use crate::cli::output::{active_mode, emit_success};
use crate::config::ConfigService;
use crate::core::ComponentFactory;
use crate::core::matcher::engine::{FileRelocationMode, MatchOperation};
use crate::core::matcher::{FileDiscovery, MatchConfig, MatchEngine, MediaFileType};
use crate::core::parallel::{
FileProcessingTask, ProcessingOperation, Task, TaskResult, TaskScheduler,
};
use crate::error::SubXError;
use crate::services::ai::AIProvider;
use indicatif::ProgressDrawTarget;
use serde::Serialize;
#[derive(Debug, Serialize)]
pub struct MatchItemError {
pub category: String,
pub code: String,
pub message: String,
}
#[derive(Debug, Serialize)]
pub struct MatchCandidate {
pub video: String,
pub subtitle: String,
pub confidence: u8,
pub accepted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct MatchOpItem {
pub kind: &'static str,
pub source: String,
pub target: String,
pub applied: bool,
pub status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<MatchItemError>,
}
#[derive(Debug, Serialize)]
pub struct MatchSummary {
pub total_candidates: usize,
pub accepted: usize,
pub applied: usize,
pub skipped: usize,
pub failed: usize,
}
#[derive(Debug, Serialize)]
pub struct MatchPayload {
pub dry_run: bool,
pub confidence_threshold: u8,
pub candidates: Vec<MatchCandidate>,
pub operations: Vec<MatchOpItem>,
pub summary: MatchSummary,
}
fn op_kind(op: &MatchOperation) -> &'static str {
if op.requires_relocation {
match op.relocation_mode {
FileRelocationMode::Copy => "copy",
FileRelocationMode::Move => "move",
FileRelocationMode::None => "rename",
}
} else {
"rename"
}
}
fn op_target(op: &MatchOperation) -> String {
match op.relocation_target_path.as_ref() {
Some(p) => p.display().to_string(),
None => op
.subtitle_file
.path
.with_file_name(&op.new_subtitle_name)
.display()
.to_string(),
}
}
pub async fn execute(args: MatchArgs, config_service: &dyn ConfigService) -> Result<()> {
let config = config_service.get_config()?;
let factory = ComponentFactory::new(config_service)?;
let ai_client = factory.create_ai_provider()?;
execute_with_client(args, ai_client, &config).await
}
pub async fn execute_with_config(
args: MatchArgs,
config_service: std::sync::Arc<dyn ConfigService>,
) -> Result<()> {
let config = config_service.get_config()?;
let factory = ComponentFactory::new(config_service.as_ref())?;
let ai_client = factory.create_ai_provider()?;
execute_with_client(args, ai_client, &config).await
}
pub async fn execute_with_client(
args: MatchArgs,
ai_client: Box<dyn AIProvider>,
config: &crate::config::Config,
) -> Result<()> {
let relocation_mode = if args.copy {
crate::core::matcher::engine::FileRelocationMode::Copy
} else if args.move_files {
crate::core::matcher::engine::FileRelocationMode::Move
} else {
crate::core::matcher::engine::FileRelocationMode::None
};
let match_config = MatchConfig {
confidence_threshold: args.confidence as f32 / 100.0,
max_sample_length: config.ai.max_sample_length,
enable_content_analysis: true,
backup_enabled: args.backup || config.general.backup_enabled,
relocation_mode,
conflict_resolution: crate::core::matcher::engine::ConflictResolution::AutoRename,
ai_model: config.ai.model.clone(),
max_subtitle_bytes: config.general.max_subtitle_bytes,
};
let engine = MatchEngine::new(ai_client, match_config);
let input_handler = args.get_input_handler()?;
let files = input_handler
.collect_files()
.map_err(|e| SubXError::CommandExecution(format!("Failed to collect files: {e}")))?;
if files.is_empty() {
return Err(SubXError::CommandExecution(
"No files found to process".to_string(),
));
}
let audit = engine.match_file_list_with_audit(&files).await?;
let mut operations = audit.operations;
let rejected = audit.rejected;
for op in &mut operations {
if files.archive_origin(&op.subtitle_file.path).is_some() && !op.requires_relocation {
if let Some(video_dir) = op.video_file.path.parent() {
op.relocation_target_path = Some(video_dir.join(&op.new_subtitle_name));
op.requires_relocation = true;
op.relocation_mode = crate::core::matcher::engine::FileRelocationMode::Copy;
}
}
}
let json_mode = active_mode().is_json();
if json_mode {
let _lock_guard = if !args.dry_run {
Some(crate::core::lock::acquire_subx_lock().await?)
} else {
None
};
let outcomes = engine
.execute_operations_audit(&operations, args.dry_run)
.await?;
let mut candidates: Vec<MatchCandidate> =
Vec::with_capacity(operations.len() + rejected.len());
for op in &operations {
candidates.push(MatchCandidate {
video: op.video_file.path.display().to_string(),
subtitle: op.subtitle_file.path.display().to_string(),
confidence: ((op.confidence * 100.0).round().clamp(0.0, 100.0)) as u8,
accepted: true,
reason: None,
});
}
for r in &rejected {
candidates.push(MatchCandidate {
video: r.video_path.clone(),
subtitle: r.subtitle_path.clone(),
confidence: ((r.confidence * 100.0).round().clamp(0.0, 100.0)) as u8,
accepted: false,
reason: Some(r.reason.to_string()),
});
}
let mut op_items: Vec<MatchOpItem> = Vec::with_capacity(operations.len());
let mut applied_count: usize = 0;
let mut failed_count: usize = 0;
for (op, outcome) in operations.iter().zip(outcomes.iter()) {
let (status, error) = match &outcome.error {
Some(err) => {
failed_count += 1;
(
"error",
Some(MatchItemError {
category: err.category.to_string(),
code: err.code.to_string(),
message: err.message.clone(),
}),
)
}
None => ("ok", None),
};
if outcome.applied {
applied_count += 1;
}
op_items.push(MatchOpItem {
kind: op_kind(op),
source: op.subtitle_file.path.display().to_string(),
target: op_target(op),
applied: outcome.applied,
status,
error,
});
}
if !op_items.is_empty() && applied_count == 0 && failed_count == op_items.len() {
let first_msg = op_items
.iter()
.filter_map(|o| o.error.as_ref().map(|e| e.message.clone()))
.next()
.unwrap_or_else(|| "All match operations failed".to_string());
return Err(SubXError::FileOperationFailed(first_msg));
}
let summary = MatchSummary {
total_candidates: candidates.len(),
accepted: operations.len(),
applied: applied_count,
skipped: rejected.len(),
failed: failed_count,
};
let payload = MatchPayload {
dry_run: args.dry_run,
confidence_threshold: args.confidence,
candidates,
operations: op_items,
summary,
};
emit_success(active_mode(), "match", payload);
return Ok(());
}
display_match_results(&operations, args.dry_run);
if !args.dry_run {
let _lock = crate::core::lock::acquire_subx_lock().await?;
engine.execute_operations(&operations, args.dry_run).await?;
}
Ok(())
}
pub async fn execute_parallel_match(
directory: &std::path::Path,
recursive: bool,
output: Option<&std::path::Path>,
config_service: &dyn ConfigService,
) -> Result<()> {
let _config = config_service.get_config()?;
let scheduler = TaskScheduler::new()?;
let discovery = FileDiscovery::new();
let files = discovery.scan_directory(directory, recursive)?;
let mut tasks: Vec<Box<dyn Task + Send + Sync>> = Vec::new();
for f in files
.iter()
.filter(|f| matches!(f.file_type, MediaFileType::Video))
{
let task = Box::new(FileProcessingTask {
input_path: f.path.clone(),
output_path: output.map(|p| p.to_path_buf()),
operation: ProcessingOperation::MatchFiles { recursive },
});
tasks.push(task);
}
let json_mode = active_mode().is_json();
if tasks.is_empty() {
if !json_mode {
println!("No video files found to process");
}
return Ok(());
}
if !json_mode {
println!("Preparing to process {} files in parallel", tasks.len());
println!("Max concurrency: {}", scheduler.get_active_workers());
}
let progress_bar = {
let pb = create_progress_bar(tasks.len());
let config = config_service.get_config()?;
if !config.general.enable_progress_bar {
pb.set_draw_target(ProgressDrawTarget::hidden());
}
pb
};
let results = monitor_batch_execution(&scheduler, tasks, &progress_bar).await?;
let (mut ok, mut failed, mut partial) = (0, 0, 0);
for r in &results {
match r {
TaskResult::Success(_) => ok += 1,
TaskResult::Failed(_) | TaskResult::Cancelled => failed += 1,
TaskResult::PartialSuccess(_, _) => partial += 1,
}
}
if !json_mode {
println!("\nProcessing results:");
println!(" ✓ Success: {ok} files");
if partial > 0 {
println!(" âš Partial success: {partial} files");
}
if failed > 0 {
println!(" ✗ Failed: {failed} files");
for (i, r) in results.iter().enumerate() {
if matches!(r, TaskResult::Failed(_)) {
println!(" Failure details {}: {}", i + 1, r);
}
}
}
}
Ok(())
}
async fn monitor_batch_execution(
scheduler: &TaskScheduler,
tasks: Vec<Box<dyn Task + Send + Sync>>,
progress_bar: &indicatif::ProgressBar,
) -> Result<Vec<TaskResult>> {
use tokio::time::{Duration, interval};
let handles: Vec<_> = tasks
.into_iter()
.map(|t| {
let s = scheduler.clone();
tokio::spawn(async move { s.submit_task(t).await })
})
.collect();
let mut ticker = interval(Duration::from_millis(500));
let mut completed = 0;
let total = handles.len();
let mut results = Vec::new();
for mut h in handles {
loop {
tokio::select! {
res = &mut h => {
match res {
Ok(Ok(r)) => results.push(r),
Ok(Err(_)) => results.push(TaskResult::Failed("Task execution error".into())),
Err(_) => results.push(TaskResult::Cancelled),
}
completed += 1;
progress_bar.set_position(completed);
break;
}
_ = ticker.tick() => {
let active = scheduler.list_active_tasks().len();
let queued = scheduler.get_queue_size();
progress_bar.set_message(format!("Active: {active} | Queued: {queued} | Completed: {completed}/{total}"));
}
}
}
}
progress_bar.finish_with_message("All tasks completed");
Ok(results)
}
fn create_progress_bar(total: usize) -> indicatif::ProgressBar {
use indicatif::ProgressStyle;
let pb = indicatif::ProgressBar::new(total as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}")
.unwrap()
.progress_chars("#>-"),
);
pb
}
#[cfg(test)]
mod tests {
use super::{execute_parallel_match, execute_with_client};
use crate::cli::MatchArgs;
use crate::config::{ConfigService, TestConfigBuilder, TestConfigService};
use crate::services::ai::{
AIProvider, AnalysisRequest, ConfidenceScore, MatchResult, VerificationRequest,
};
use async_trait::async_trait;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::tempdir;
struct DummyAI;
#[async_trait]
impl AIProvider for DummyAI {
async fn analyze_content(&self, _req: AnalysisRequest) -> crate::Result<MatchResult> {
Ok(MatchResult {
matches: Vec::new(),
confidence: 0.0,
reasoning: String::new(),
})
}
async fn verify_match(&self, _req: VerificationRequest) -> crate::Result<ConfidenceScore> {
panic!("verify_match should not be called in dry-run test");
}
}
#[tokio::test]
async fn dry_run_creates_cache_and_skips_execute_operations() -> crate::Result<()> {
let media_dir = tempdir()?;
let media_path = media_dir.path().join("media");
fs::create_dir_all(&media_path)?;
let video = media_path.join("video.mkv");
let subtitle = media_path.join("subtitle.ass");
fs::write(&video, b"dummy")?;
fs::write(&subtitle, b"dummy")?;
let _config = TestConfigBuilder::new()
.with_ai_provider("test")
.with_ai_model("test-model")
.build_config();
let args = MatchArgs {
path: Some(PathBuf::from(&media_path)),
input_paths: Vec::new(),
dry_run: true,
recursive: false,
confidence: 80,
backup: false,
copy: false,
move_files: false,
no_extract: false,
};
let config = crate::config::TestConfigBuilder::new().build_config();
let result = execute_with_client(args, Box::new(DummyAI), &config).await;
if result.is_err() {
println!("Test completed with expected limitations in isolated environment");
}
assert!(
video.exists(),
"dry_run should not execute operations, video file should still exist"
);
assert!(
subtitle.exists(),
"dry_run should not execute operations, subtitle file should still exist"
);
Ok(())
}
#[tokio::test]
async fn test_execute_parallel_match_no_files() -> crate::Result<()> {
let temp_dir = tempdir()?;
let config_service = crate::config::TestConfigBuilder::new().build_service();
let result = execute_parallel_match(&temp_dir.path(), false, None, &config_service).await;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_match_with_isolated_config() -> crate::Result<()> {
let config = TestConfigBuilder::new()
.with_ai_provider("openai")
.with_ai_model("gpt-4.1")
.build_config();
let config_service = Arc::new(TestConfigService::new(config));
let loaded_config = config_service.get_config()?;
assert_eq!(loaded_config.ai.provider, "openai");
assert_eq!(loaded_config.ai.model, "gpt-4.1");
Ok(())
}
}