1use anyhow::Result;
9use clap::{Parser, Subcommand};
10use colored::Colorize;
11use raps_kernel::progress;
12use std::path::PathBuf;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::fs;
17use tokio::sync::Semaphore;
18use tokio::time::sleep;
19
20use raps_derivative::{DerivativeClient, OutputFormat};
21use raps_kernel::auth::AuthClient;
22use raps_kernel::config::Config;
23use raps_oss::{OssClient, Region, RetentionPolicy};
24
25#[derive(Subcommand)]
26pub enum DemoCommands {
27 BucketLifecycle(BucketLifecycleArgs),
29
30 ModelPipeline(ModelPipelineArgs),
32
33 DataManagement(DataManagementArgs),
35
36 BatchProcessing(BatchProcessingArgs),
38}
39
40#[derive(Parser)]
41pub struct BucketLifecycleArgs {
42 #[arg(long, default_value_t = format!("demo-{}", chrono::Utc::now().timestamp_millis()))]
44 prefix: String,
45
46 #[arg(long)]
48 skip_cleanup: bool,
49}
50
51#[derive(Parser)]
52pub struct ModelPipelineArgs {
53 #[arg(short, long)]
55 file: Option<PathBuf>,
56
57 #[arg(long)]
59 bucket: Option<String>,
60
61 #[arg(long, default_value = "svf2")]
63 format: String,
64
65 #[arg(long)]
67 keep_bucket: bool,
68}
69
70#[derive(Parser)]
71pub struct DataManagementArgs {
72 #[arg(long)]
74 non_interactive: bool,
75
76 #[arg(long)]
78 export: Option<PathBuf>,
79}
80
81#[derive(Parser)]
82pub struct BatchProcessingArgs {
83 #[arg(short, long)]
85 input: Option<PathBuf>,
86
87 #[arg(long, default_value = "3")]
89 max_parallel: usize,
90
91 #[arg(long)]
93 bucket_prefix: Option<String>,
94
95 #[arg(long, default_value = "svf2")]
97 format: String,
98
99 #[arg(long)]
101 skip_cleanup: bool,
102}
103
104impl DemoCommands {
105 pub async fn execute(&self, concurrency: usize) -> Result<()> {
106 match self {
107 DemoCommands::BucketLifecycle(args) => bucket_lifecycle(args).await,
108 DemoCommands::ModelPipeline(args) => model_pipeline(args).await,
109 DemoCommands::DataManagement(args) => data_management(args).await,
110 DemoCommands::BatchProcessing(args) => batch_processing(args, concurrency).await,
111 }
112 }
113}
114
115async fn bucket_lifecycle(args: &BucketLifecycleArgs) -> Result<()> {
120 let config = Config::from_env()?;
121 let auth = AuthClient::new(config.clone());
122 let oss = OssClient::new(config.clone(), auth);
123
124 println!("\n{}", "═".repeat(60).cyan());
125 println!("{}", " APS Bucket Lifecycle Demo".cyan().bold());
126 println!("{}", "═".repeat(60).cyan());
127 println!("Prefix: {}", args.prefix.dimmed());
128
129 let mut created_buckets: Vec<String> = Vec::new();
130
131 println!("\n{}", "[1/5] Creating buckets...".yellow());
133
134 let buckets = vec![
135 (format!("{}-us-transient", args.prefix), "US", "transient"),
136 (format!("{}-us-temporary", args.prefix), "US", "temporary"),
137 (
138 format!("{}-emea-persistent", args.prefix),
139 "EMEA",
140 "persistent",
141 ),
142 ];
143
144 for (name, region, policy) in &buckets {
145 print!(" Creating {} in {}...", name, region);
146
147 let region_enum = match region.to_uppercase().as_str() {
148 "EMEA" => Region::EMEA,
149 _ => Region::US,
150 };
151
152 let policy_enum = RetentionPolicy::from_str(policy).unwrap_or(RetentionPolicy::Transient);
153
154 match oss.create_bucket(name, policy_enum, region_enum).await {
155 Ok(_) => {
156 println!(" {}", "OK".green());
157 created_buckets.push(name.clone());
158 }
159 Err(e) => {
160 if e.to_string().contains("already exists") {
161 println!(" {}", "SKIP (exists)".yellow());
162 created_buckets.push(name.clone());
163 } else {
164 println!(" {}: {}", "FAILED".red(), e);
165 }
166 }
167 }
168 }
169
170 println!("\n{}", "[2/5] Listing buckets...".yellow());
172 match oss.list_buckets().await {
173 Ok(buckets) => {
174 println!(" Found {} buckets", buckets.len());
175 for bucket in buckets.iter().take(10) {
176 println!(" - {} ({})", bucket.bucket_key, bucket.policy_key);
177 }
178 if buckets.len() > 10 {
179 println!(" ... and {} more", buckets.len() - 10);
180 }
181 }
182 Err(e) => println!(" {}: {}", "Error".red(), e),
183 }
184
185 println!(
187 "\n{}",
188 "[3/5] Generating and uploading test files...".yellow()
189 );
190
191 let temp_dir = std::env::temp_dir().join("aps-demo-files");
192 fs::create_dir_all(&temp_dir).await?;
193
194 let mut test_files: Vec<PathBuf> = Vec::new();
195
196 for i in 1..=3 {
197 let file_name = format!("test-model-{}.json", i);
198 let file_path = temp_dir.join(&file_name);
199
200 let content = serde_json::json!({
201 "id": uuid::Uuid::new_v4().to_string(),
202 "name": format!("Test Model {}", i),
203 "created": chrono::Utc::now().to_rfc3339(),
204 "elements": [
205 { "type": "Wall", "count": rand::random::<u32>() % 400 + 100 },
206 { "type": "Door", "count": rand::random::<u32>() % 80 + 20 },
207 { "type": "Window", "count": rand::random::<u32>() % 120 + 30 }
208 ],
209 "metadata": {
210 "author": "Demo Script (Rust)",
211 "version": format!("1.0.{}", i)
212 }
213 });
214
215 fs::write(&file_path, serde_json::to_string_pretty(&content)?).await?;
216 let size = fs::metadata(&file_path).await?.len();
217 println!(" Generated: {} ({} bytes)", file_name, size);
218 test_files.push(file_path);
219 }
220
221 if let Some(target_bucket) = created_buckets.first() {
223 println!("\n Uploading to bucket: {}", target_bucket.dimmed());
224
225 for file_path in &test_files {
226 let file_name = file_path.file_name().unwrap_or_default().to_string_lossy();
227 print!(" Uploading {}...", file_name);
228
229 match oss
230 .upload_object(target_bucket, &file_name, file_path)
231 .await
232 {
233 Ok(_) => println!(" {}", "OK".green()),
234 Err(e) => println!(" {}: {}", "ERROR".red(), e),
235 }
236 }
237 }
238
239 println!("\n{}", "[4/5] Listing objects in buckets...".yellow());
241 for bucket_name in &created_buckets {
242 println!("\n Bucket: {}", bucket_name.dimmed());
243 match oss.list_objects(bucket_name).await {
244 Ok(objects) => {
245 if objects.is_empty() {
246 println!(" (empty)");
247 } else {
248 for obj in &objects {
249 println!(" - {} ({} bytes)", obj.object_key, obj.size);
250 }
251 }
252 }
253 Err(e) => println!(" {}: {}", "Error".red(), e),
254 }
255 }
256
257 if !args.skip_cleanup {
259 println!("\n{}", "[5/5] Cleaning up...".yellow());
260
261 for bucket_name in &created_buckets {
262 print!(" Deleting objects in {}...", bucket_name);
264 if let Ok(objects) = oss.list_objects(bucket_name).await {
265 for obj in objects {
266 let _ = oss.delete_object(bucket_name, &obj.object_key).await;
267 }
268 }
269 println!(" done");
270
271 print!(" Deleting bucket {}...", bucket_name);
273 match oss.delete_bucket(bucket_name).await {
274 Ok(_) => println!(" {}", "OK".green()),
275 Err(_) => println!(" {}", "FAILED".yellow()),
276 }
277 }
278
279 let _ = fs::remove_dir_all(&temp_dir).await;
281 } else {
282 println!("\n{}", "[5/5] Cleanup skipped (--skip-cleanup)".dimmed());
283 }
284
285 println!("\n{}", "═".repeat(60).cyan());
286 println!("{}", " Demo Complete".cyan().bold());
287 println!("{}", "═".repeat(60).cyan());
288 println!("Created buckets: {}", created_buckets.join(", "));
289
290 Ok(())
291}
292
293async fn model_pipeline(args: &ModelPipelineArgs) -> Result<()> {
298 let config = Config::from_env()?;
299 let auth = AuthClient::new(config.clone());
300 let oss = OssClient::new(config.clone(), auth.clone());
301 let derivative = DerivativeClient::new(config.clone(), auth);
302
303 println!(
304 "\n{}",
305 "╔══════════════════════════════════════════════════════════════╗".cyan()
306 );
307 println!(
308 "{}",
309 "║ APS Model Processing Pipeline ║".cyan()
310 );
311 println!(
312 "{}",
313 "╚══════════════════════════════════════════════════════════════╝".cyan()
314 );
315
316 let bucket_key = args
317 .bucket
318 .clone()
319 .unwrap_or_else(|| format!("pipeline-{}", chrono::Utc::now().timestamp_millis()));
320
321 let file_path = if let Some(ref path) = args.file {
323 path.clone()
324 } else {
325 println!("\nNo file specified, creating synthetic test file...");
326 let temp_dir = std::env::temp_dir().join("aps-pipeline-demo");
327 fs::create_dir_all(&temp_dir).await?;
328
329 let file_path = temp_dir.join("test-cube.obj");
330 let obj_content = r"# Simple Cube OBJ
331# Generated by APS Demo Pipeline (Rust)
332
333# Vertices
334v -1.0 -1.0 1.0
335v 1.0 -1.0 1.0
336v 1.0 1.0 1.0
337v -1.0 1.0 1.0
338v -1.0 -1.0 -1.0
339v 1.0 -1.0 -1.0
340v 1.0 1.0 -1.0
341v -1.0 1.0 -1.0
342
343# Normals
344vn 0.0 0.0 1.0
345vn 0.0 0.0 -1.0
346vn 0.0 1.0 0.0
347vn 0.0 -1.0 0.0
348vn 1.0 0.0 0.0
349vn -1.0 0.0 0.0
350
351# Faces
352f 1//1 2//1 3//1 4//1
353f 8//2 7//2 6//2 5//2
354f 4//3 3//3 7//3 8//3
355f 5//4 6//4 2//4 1//4
356f 2//5 6//5 7//5 3//5
357f 5//6 1//6 4//6 8//6
358";
359 fs::write(&file_path, obj_content).await?;
360 println!(" Created: {}", file_path.display());
361 file_path
362 };
363
364 let file_name = file_path
365 .file_name()
366 .unwrap_or_default()
367 .to_string_lossy()
368 .to_string();
369 let file_size = fs::metadata(&file_path).await?.len();
370
371 println!(
372 "\nFile: {} ({:.2} KB)",
373 file_name,
374 file_size as f64 / 1024.0
375 );
376 println!("Bucket: {}", bucket_key);
377 println!("Format: {}", args.format);
378
379 println!("\n{}", "[1/5] Creating bucket...".yellow());
381 match oss
382 .create_bucket(&bucket_key, RetentionPolicy::Transient, Region::US)
383 .await
384 {
385 Ok(_) => println!(" Bucket created successfully"),
386 Err(e) => {
387 if e.to_string().contains("already exists") {
388 println!(" Bucket already exists, continuing...");
389 } else {
390 println!(" Warning: {e}");
391 }
392 }
393 }
394
395 println!("\n{}", "[2/5] Uploading file...".yellow());
397 let upload_start = Instant::now();
398 oss.upload_object(&bucket_key, &file_name, &file_path)
399 .await?;
400 println!(
401 " Upload completed in {:.2}s",
402 upload_start.elapsed().as_secs_f64()
403 );
404
405 let urn = oss.get_urn(&bucket_key, &file_name);
407 println!(" URN: {}", urn.dimmed());
408
409 println!("\n{}", "[3/5] Starting translation...".yellow());
411 let output_format = OutputFormat::from_str(&args.format).unwrap_or(OutputFormat::Svf2);
412 match derivative
413 .translate(
414 &urn,
415 output_format,
416 None,
417 raps_derivative::MdRegion::default(),
418 false,
419 )
420 .await
421 {
422 Ok(_) => println!(" Translation job submitted"),
423 Err(e) => println!(" Translation request: {}", e),
424 }
425
426 println!("\n{}", "[4/5] Monitoring translation progress...".yellow());
428 let start_time = Instant::now();
429 let max_wait = Duration::from_secs(600); let pb = progress::spinner("Waiting for translation...");
432
433 loop {
434 if start_time.elapsed() > max_wait {
435 pb.finish_with_message("Timeout after 10 minutes");
436 break;
437 }
438
439 match derivative.get_manifest(&urn).await {
440 Ok(manifest) => {
441 let status = manifest.status.to_lowercase();
442 if status.contains("success") || status.contains("complete") {
443 pb.finish_with_message(format!("{} Translation complete!", "✓".green()));
444 break;
445 }
446 if status.contains("failed") {
447 pb.finish_with_message(format!("{} Translation failed", "✗".red()));
448 break;
449 }
450
451 pb.set_message(format!(
452 "Status: {} ({}s)",
453 status,
454 start_time.elapsed().as_secs()
455 ));
456 }
457 Err(_) => {
458 pb.set_message(format!("Waiting... ({}s)", start_time.elapsed().as_secs()));
459 }
460 }
461
462 sleep(Duration::from_secs(3)).await;
463 }
464
465 println!("\n{}", "[5/5] Fetching manifest...".yellow());
467 match derivative.get_manifest(&urn).await {
468 Ok(manifest) => {
469 println!(" Manifest retrieved successfully");
470 println!("\n--- Manifest Preview ---");
471 println!(" Status: {}", manifest.status);
472 println!(" Progress: {}", manifest.progress);
473 if !manifest.derivatives.is_empty() {
474 println!(" Derivatives:");
475 for d in manifest.derivatives.iter().take(5) {
476 println!(" - {} ({})", d.output_type, d.status);
477 }
478 }
479 }
480 Err(e) => {
481 println!(" Could not retrieve manifest: {}", e);
482 }
483 }
484
485 println!(
487 "\n{}",
488 "╔══════════════════════════════════════════════════════════════╗".cyan()
489 );
490 println!(
491 "{}",
492 "║ Pipeline Summary ║".cyan()
493 );
494 println!(
495 "{}",
496 "╚══════════════════════════════════════════════════════════════╝".cyan()
497 );
498 println!(" File: {}", file_name);
499 println!(" Bucket: {}", bucket_key);
500 println!(" URN: {}", urn);
501 println!(" Format: {}", args.format);
502
503 if !args.keep_bucket {
505 println!("\nCleaning up bucket...");
506 let _ = oss.delete_object(&bucket_key, &file_name).await;
507 let _ = oss.delete_bucket(&bucket_key).await;
508 println!(" Cleanup complete");
509 } else {
510 println!("\nBucket preserved (--keep-bucket specified)");
511 }
512
513 println!("\n{}", "=== Pipeline Complete ===".cyan());
514
515 Ok(())
516}
517
518async fn data_management(args: &DataManagementArgs) -> Result<()> {
523 let config = Config::from_env()?;
524 let auth = AuthClient::new(config.clone());
525
526 println!(
527 "\n{}",
528 "╔══════════════════════════════════════════════════════════════╗".cyan()
529 );
530 println!(
531 "{}",
532 "║ BIM 360 / ACC Data Management Explorer ║".cyan()
533 );
534 println!(
535 "{}",
536 "╚══════════════════════════════════════════════════════════════╝".cyan()
537 );
538
539 println!("\n{}", "Checking authentication...".yellow());
541
542 let token = match auth.get_3leg_token().await {
543 Ok(t) => {
544 println!(" {} Authenticated (3-legged)", "✓".green());
545 t
546 }
547 Err(_) => {
548 println!(" {} 3-legged authentication required", "✗".red());
549 println!(" Run: raps auth login");
550 return Ok(());
551 }
552 };
553
554 let client = reqwest::Client::new();
555
556 println!("\n{}", "[1/3] Fetching Hubs...".yellow());
558
559 let hubs_response = client
560 .get(format!("{}/hubs", config.project_url()))
561 .bearer_auth(&token)
562 .send()
563 .await?;
564
565 let mut export_data = serde_json::json!({
566 "timestamp": chrono::Utc::now().to_rfc3339(),
567 "hubs": []
568 });
569
570 if hubs_response.status().is_success() {
571 let hubs: serde_json::Value = hubs_response.json().await?;
572
573 if let Some(data) = hubs.get("data").and_then(|d| d.as_array()) {
574 println!(" Found {} hubs:", data.len());
575
576 for hub in data {
577 let id = hub.get("id").and_then(|v| v.as_str()).unwrap_or("unknown");
578 let name = hub
579 .get("attributes")
580 .and_then(|a| a.get("name"))
581 .and_then(|n| n.as_str())
582 .unwrap_or("unnamed");
583
584 println!(" - {} ({})", name.green(), id.dimmed());
585
586 export_data["hubs"]
587 .as_array_mut()
588 .expect("expected JSON array in demo data")
589 .push(serde_json::json!({
590 "id": id,
591 "name": name
592 }));
593 }
594 }
595 } else {
596 println!(" Failed to fetch hubs: {}", hubs_response.status());
597 }
598
599 println!("\n{}", "[2/3] Sample Projects...".yellow());
601
602 if let Some(hubs) = export_data["hubs"].as_array() {
603 for hub in hubs.iter().take(2) {
604 if let Some(hub_id) = hub["id"].as_str() {
605 println!("\n Hub: {}", hub["name"].as_str().unwrap_or("?"));
606
607 let projects_response = client
608 .get(format!("{}/hubs/{}/projects", config.project_url(), hub_id))
609 .bearer_auth(&token)
610 .send()
611 .await?;
612
613 if projects_response.status().is_success() {
614 let projects: serde_json::Value = projects_response.json().await?;
615
616 if let Some(data) = projects.get("data").and_then(|d| d.as_array()) {
617 for project in data.iter().take(5) {
618 let name = project
619 .get("attributes")
620 .and_then(|a| a.get("name"))
621 .and_then(|n| n.as_str())
622 .unwrap_or("unnamed");
623 let id = project.get("id").and_then(|v| v.as_str()).unwrap_or("?");
624 println!(" - {} ({})", name, id.dimmed());
625 }
626 if data.len() > 5 {
627 println!(" ... and {} more", data.len() - 5);
628 }
629 }
630 }
631 }
632 }
633 }
634
635 if !args.non_interactive {
637 println!("\n{}", "[3/3] Interactive Exploration".yellow());
638 println!(" (Use raps commands for folder navigation)");
639 println!("\n Example commands:");
640 println!(" raps hub list");
641 println!(" raps project list --hub-id <hub_id>");
642 }
643
644 if let Some(ref export_path) = args.export {
646 println!(
647 "\n{}",
648 format!("Exporting data to: {}", export_path.display()).yellow()
649 );
650 fs::write(export_path, serde_json::to_string_pretty(&export_data)?).await?;
651 println!(" Export complete");
652 }
653
654 println!("\n{}", "=== Exploration Complete ===".cyan());
655
656 Ok(())
657}
658
659async fn batch_processing(args: &BatchProcessingArgs, concurrency: usize) -> Result<()> {
664 let config = Config::from_env()?;
665 let auth = AuthClient::new(config.clone());
666 let oss = OssClient::new(config.clone(), auth.clone());
667 let derivative = DerivativeClient::new(config.clone(), auth);
668
669 println!(
670 "\n{}",
671 "╔══════════════════════════════════════════════════════════════╗".cyan()
672 );
673 println!(
674 "{}",
675 "║ APS Batch Translation Pipeline ║".cyan()
676 );
677 println!(
678 "{}",
679 "╚══════════════════════════════════════════════════════════════╝".cyan()
680 );
681
682 let bucket_prefix = args
683 .bucket_prefix
684 .clone()
685 .unwrap_or_else(|| format!("batch-{}", chrono::Utc::now().timestamp_millis()));
686
687 let input_folder = if let Some(ref path) = args.input {
689 path.clone()
690 } else {
691 println!("\nNo input folder specified, generating synthetic test files...");
692
693 let temp_dir = std::env::temp_dir().join("aps-batch-demo");
694 fs::create_dir_all(&temp_dir).await?;
695
696 let shapes = vec![
698 (
699 "cube",
700 vec![
701 "-1 -1 1", "1 -1 1", "1 1 1", "-1 1 1", "-1 -1 -1", "1 -1 -1", "1 1 -1",
702 "-1 1 -1",
703 ],
704 ),
705 (
706 "pyramid",
707 vec!["0 1 0", "-1 -1 1", "1 -1 1", "1 -1 -1", "-1 -1 -1"],
708 ),
709 (
710 "wedge",
711 vec![
712 "-1 -1 1", "1 -1 1", "1 1 1", "-1 -1 -1", "1 -1 -1", "1 1 -1",
713 ],
714 ),
715 ];
716
717 for (name, vertices) in shapes {
718 let file_path = temp_dir.join(format!("{}.obj", name));
719 let mut content = format!("# {} OBJ\n# Generated for batch demo\n\n", name);
720 for v in vertices {
721 content.push_str(&format!("v {}\n", v));
722 }
723 content.push_str("\n# Faces (simplified)\nf 1 2 3\nf 1 3 4\n");
724 fs::write(&file_path, &content).await?;
725 println!(" Generated: {}.obj", name);
726 }
727
728 temp_dir
729 };
730
731 let supported_extensions = vec![
733 "obj", "fbx", "dwg", "dxf", "ifc", "rvt", "rfa", "nwd", "nwc", "stp", "step", "iges", "igs",
734 ];
735
736 let mut files: Vec<PathBuf> = Vec::new();
737 let mut entries = fs::read_dir(&input_folder).await?;
738
739 while let Some(entry) = entries.next_entry().await? {
740 let path = entry.path();
741 if path.is_file()
742 && let Some(ext) = path.extension()
743 && supported_extensions.contains(&ext.to_string_lossy().to_lowercase().as_str())
744 {
745 files.push(path);
746 }
747 }
748
749 if files.is_empty() {
750 println!("\n{}", "No supported model files found.".yellow());
751 println!("Supported extensions: {}", supported_extensions.join(", "));
752 return Ok(());
753 }
754
755 println!("\nFound {} files to process:", files.len());
756 for file in &files {
757 let size = fs::metadata(file).await?.len();
758 println!(
759 " - {} ({:.2} KB)",
760 file.file_name().unwrap_or_default().to_string_lossy(),
761 size as f64 / 1024.0
762 );
763 }
764
765 println!(
767 "\n{}",
768 format!("[1/4] Creating bucket: {}", bucket_prefix).yellow()
769 );
770 let _ = oss
771 .create_bucket(&bucket_prefix, RetentionPolicy::Transient, Region::US)
772 .await;
773 println!(" Bucket ready");
774
775 println!(
777 "\n{}",
778 "[2/4] Uploading and starting translations...".yellow()
779 );
780
781 #[derive(Debug)]
782 struct Job {
783 file: String,
784 urn: String,
785 status: String,
786 start_time: Instant,
787 end_time: Option<Instant>,
788 }
789
790 let mut jobs: Vec<Job> = Vec::new();
791 let output_format = OutputFormat::from_str(&args.format).unwrap_or(OutputFormat::Svf2);
792
793 let max_parallel = concurrency.min(args.max_parallel);
795 let semaphore = Arc::new(Semaphore::new(max_parallel));
796 let mut handles = Vec::new();
797
798 println!(
799 "\n Processing {} files with concurrency limit of {}...",
800 files.len(),
801 max_parallel
802 );
803
804 let oss = Arc::new(oss);
806 let derivative = Arc::new(derivative);
807
808 for file in &files {
810 let file_name = file
811 .file_name()
812 .unwrap_or_default()
813 .to_string_lossy()
814 .to_string();
815 let file_path = file.clone();
816 let bucket_prefix_clone = bucket_prefix.clone();
817 let oss_clone = oss.clone();
818 let derivative_clone = derivative.clone();
819 let semaphore_clone = semaphore.clone();
820 let output_format_clone = output_format;
821
822 let handle = tokio::spawn(async move {
823 let _permit = semaphore_clone
825 .acquire()
826 .await
827 .expect("semaphore closed unexpectedly");
828
829 print!(" Processing: {}...", file_name);
830
831 let result: Result<Job, anyhow::Error> = match oss_clone
832 .upload_object(&bucket_prefix_clone, &file_name, &file_path)
833 .await
834 {
835 Ok(_) => {
836 let urn = oss_clone.get_urn(&bucket_prefix_clone, &file_name);
837
838 match derivative_clone
839 .translate(
840 &urn,
841 output_format_clone,
842 None,
843 raps_derivative::MdRegion::default(),
844 false,
845 )
846 .await
847 {
848 Ok(_) => {
849 println!(" {}", "submitted".green());
850 Ok(Job {
851 file: file_name,
852 urn,
853 status: "submitted".to_string(),
854 start_time: Instant::now(),
855 end_time: None,
856 })
857 }
858 Err(e) => {
859 println!(" {}", "translate failed".red());
860 Ok(Job {
861 file: file_name,
862 urn,
863 status: format!("translate_failed: {}", e),
864 start_time: Instant::now(),
865 end_time: Some(Instant::now()),
866 })
867 }
868 }
869 }
870 Err(e) => {
871 println!(" {}", "upload failed".red());
872 Ok(Job {
873 file: file_name,
874 urn: String::new(),
875 status: format!("upload_failed: {}", e),
876 start_time: Instant::now(),
877 end_time: Some(Instant::now()),
878 })
879 }
880 };
881
882 result
884 });
885
886 handles.push(handle);
887 }
888
889 for handle in handles {
891 match handle.await {
892 Ok(Ok(job)) => jobs.push(job),
893 Ok(Err(e)) => {
894 eprintln!(" Error processing file: {}", e);
895 }
896 Err(e) => {
897 eprintln!(" Task panicked: {}", e);
898 }
899 }
900 }
901
902 println!("\n{}", "[3/4] Monitoring translation progress...".yellow());
904 let start_time = Instant::now();
905 let max_wait = Duration::from_secs(900); loop {
908 if start_time.elapsed() > max_wait {
909 println!("\n Timeout after 15 minutes");
910 break;
911 }
912
913 let mut pending = 0;
914 let mut completed = 0;
915 let mut failed = 0;
916
917 for job in &mut jobs {
918 if job.status == "submitted"
919 && let Ok(manifest) = derivative.get_manifest(&job.urn).await
920 {
921 let status = manifest.status.to_lowercase();
922 if status.contains("success") || status.contains("complete") {
923 job.status = "complete".to_string();
924 job.end_time = Some(Instant::now());
925 } else if status.contains("failed") {
926 job.status = "failed".to_string();
927 job.end_time = Some(Instant::now());
928 }
929 }
930
931 match job.status.as_str() {
932 "complete" => completed += 1,
933 "submitted" => pending += 1,
934 _ => failed += 1,
935 }
936 }
937
938 print!(
939 "\r Progress: {} complete, {} failed, {} pending ({}s) ",
940 completed,
941 failed,
942 pending,
943 start_time.elapsed().as_secs()
944 );
945
946 if pending == 0 {
947 println!();
948 break;
949 }
950
951 sleep(Duration::from_secs(5)).await;
952 }
953
954 println!("\n{}", "[4/4] Results Summary".yellow());
956 println!("\n ╔═══════════════════════════════════════════════════════════╗");
957 println!(" ║ File Status Duration ║");
958 println!(" ╠═══════════════════════════════════════════════════════════╣");
959
960 let mut completed_count = 0;
961 let mut failed_count = 0;
962
963 for job in &jobs {
964 let duration = job
965 .end_time
966 .map(|e| format!("{:.1}s", (e - job.start_time).as_secs_f64()))
967 .unwrap_or_else(|| "-".to_string());
968
969 let file_display = if job.file.len() > 28 {
970 format!("{}...", &job.file[..25])
971 } else {
972 format!("{:28}", job.file)
973 };
974
975 let (status_display, color) = match job.status.as_str() {
976 "complete" => {
977 completed_count += 1;
978 ("complete ".to_string(), "green")
979 }
980 "submitted" => ("pending ".to_string(), "yellow"),
981 _ => {
982 failed_count += 1;
983 ("failed ".to_string(), "red")
984 }
985 };
986
987 let line = format!(
988 " ║ {} {} {:12} ║",
989 file_display, status_display, duration
990 );
991 match color {
992 "green" => println!("{}", line.green()),
993 "red" => println!("{}", line.red()),
994 _ => println!("{}", line.yellow()),
995 }
996 }
997
998 println!(" ╚═══════════════════════════════════════════════════════════╝");
999
1000 let total_time = start_time.elapsed();
1002 println!("\n Statistics:");
1003 println!(" Total files: {}", files.len());
1004 println!(
1005 " Completed: {}",
1006 format!("{}", completed_count).green()
1007 );
1008 if failed_count > 0 {
1009 println!(" Failed: {}", format!("{}", failed_count).red());
1010 } else {
1011 println!(" Failed: 0");
1012 }
1013 println!(" Total time: {:.1}s", total_time.as_secs_f64());
1014 println!(
1015 " Avg per file: {:.1}s",
1016 total_time.as_secs_f64() / files.len().max(1) as f64
1017 );
1018
1019 if !args.skip_cleanup {
1021 println!("\n{}", "Cleaning up...".yellow());
1022 for file in &files {
1023 let file_name = file.file_name().unwrap_or_default().to_string_lossy();
1024 let _ = oss.delete_object(&bucket_prefix, &file_name).await;
1025 }
1026 let _ = oss.delete_bucket(&bucket_prefix).await;
1027
1028 if args.input.is_none() {
1030 let _ = fs::remove_dir_all(std::env::temp_dir().join("aps-batch-demo")).await;
1031 }
1032
1033 println!(" Cleanup complete");
1034 }
1035
1036 println!("\n{}", "=== Batch Processing Complete ===".cyan());
1037
1038 Ok(())
1039}