use docker_wrapper::command::DockerCommand;
use docker_wrapper::{BuildCommand, LogsCommand, RunCommand};
use docker_wrapper::{OutputLine, StreamHandler, StreamableCommand};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Docker Streaming Example");
println!("========================\n");
example_build_streaming().await?;
example_run_streaming().await?;
example_logs_filtering().await?;
example_channel_streaming().await?;
println!("\n✨ All streaming examples completed!");
Ok(())
}
async fn example_build_streaming() -> Result<(), Box<dyn std::error::Error>> {
println!("📦 Example 1: Streaming Docker Build Output");
println!("-------------------------------------------");
std::fs::write(
"Dockerfile.streaming",
r#"
FROM alpine:latest
RUN echo "Step 1: Installing packages..."
RUN echo "Step 2: Setting up application..."
RUN echo "Step 3: Configuring environment..."
CMD ["echo", "Build complete!"]
"#,
)?;
println!("Building image with streaming output...\n");
let result = BuildCommand::new(".")
.file("Dockerfile.streaming")
.tag("streaming-example:latest")
.stream(StreamHandler::print())
.await?;
if result.is_success() {
println!("\n✅ Build completed successfully!");
} else {
println!("\n❌ Build failed with exit code: {}", result.exit_code);
}
std::fs::remove_file("Dockerfile.streaming")?;
Ok(())
}
async fn example_run_streaming() -> Result<(), Box<dyn std::error::Error>> {
println!("\n🚀 Example 2: Streaming Container Output");
println!("----------------------------------------");
println!("Running container with streaming output...\n");
let line_count = Arc::new(AtomicUsize::new(0));
let count_clone = line_count.clone();
let result = RunCommand::new("alpine")
.cmd(vec![
"sh".to_string(),
"-c".to_string(),
"for i in 1 2 3 4 5; do echo \"Line $i\"; sleep 0.1; done".to_string(),
])
.remove() .stream(move |line| match line {
OutputLine::Stdout(text) => {
println!("Container: {}", text);
count_clone.fetch_add(1, Ordering::SeqCst);
}
OutputLine::Stderr(text) => {
eprintln!("Container Error: {}", text);
}
})
.await?;
println!("\n✅ Container exited with code: {}", result.exit_code);
println!(
" Processed {} lines of output",
line_count.load(Ordering::SeqCst)
);
Ok(())
}
async fn example_logs_filtering() -> Result<(), Box<dyn std::error::Error>> {
println!("\n📜 Example 3: Streaming Logs with Filtering");
println!("-------------------------------------------");
println!("Creating a container for log streaming...");
let container_name = "streaming-log-example";
RunCommand::new("alpine")
.name(container_name)
.detach()
.remove()
.cmd(vec!["sh".to_string(), "-c".to_string(), "for i in 1 2 3 4 5; do echo \"Log entry $i\"; echo \"Error: Something went wrong $i\" >&2; sleep 1; done".to_string()])
.execute()
.await?;
println!("Streaming logs with custom filtering...\n");
let error_count = Arc::new(AtomicUsize::new(0));
let info_count = Arc::new(AtomicUsize::new(0));
let error_clone = error_count.clone();
let info_clone = info_count.clone();
let _result = LogsCommand::new(container_name)
.follow()
.timestamps()
.tail("all")
.stream(move |line| match line {
OutputLine::Stdout(text) => {
if text.contains("Log entry") {
println!("[INFO] {}", text);
info_clone.fetch_add(1, Ordering::SeqCst);
}
}
OutputLine::Stderr(text) => {
if text.contains("Error") {
eprintln!("[ERROR] {}", text);
error_clone.fetch_add(1, Ordering::SeqCst);
}
}
})
.await;
println!("\n✅ Log streaming completed");
println!(" Info messages: {}", info_count.load(Ordering::SeqCst));
println!(" Error messages: {}", error_count.load(Ordering::SeqCst));
let _ = std::process::Command::new("docker")
.args(["stop", container_name])
.output();
Ok(())
}
async fn example_channel_streaming() -> Result<(), Box<dyn std::error::Error>> {
println!("\n📡 Example 4: Channel-based Streaming");
println!("-------------------------------------");
println!("Using channel to process output asynchronously...\n");
let (mut rx, _result) = RunCommand::new("alpine")
.cmd(vec![
"sh".to_string(),
"-c".to_string(),
"for i in 1 2 3; do echo \"Data: $i\"; sleep 0.5; done".to_string(),
])
.remove()
.stream_channel()
.await?;
let processor = tokio::spawn(async move {
let mut lines = Vec::new();
while let Some(line) = rx.recv().await {
match line {
OutputLine::Stdout(text) => {
println!("Received via channel: {}", text);
lines.push(text);
}
OutputLine::Stderr(text) => {
eprintln!("Error via channel: {}", text);
}
}
}
lines
});
let lines = processor.await?;
println!("\n✅ Channel streaming completed");
println!(" Collected {} lines via channel", lines.len());
Ok(())
}
#[allow(dead_code)]
async fn example_build_progress() -> Result<(), Box<dyn std::error::Error>> {
println!("\n🏗️ Bonus: Build Progress Tracking");
println!("---------------------------------");
let mut current_step = 0;
let total_steps = 5;
let result = BuildCommand::new(".")
.tag("progress-example:latest")
.stream(move |line| {
if let OutputLine::Stdout(text) = line {
if text.contains("Step") {
current_step += 1;
let progress = (current_step as f32 / total_steps as f32) * 100.0;
println!("[{:.0}%] {}", progress, text);
} else {
println!(" {}", text);
}
}
})
.await?;
if result.is_success() {
println!("\n✅ Build completed with progress tracking!");
}
Ok(())
}