wireframe 0.3.0

Simplify building servers and clients for custom binary protocols.
Documentation
//! Demonstrates multi-packet responses using `Response::with_channel`.
//!
//! The example splits a transcript into multiple frames and streams them via a
//! bounded channel. Two background tasks send frames concurrently to showcase
//! how cloning the sender enables cooperative production while back-pressure
//! keeps senders in lock-step with the consumer.

use std::{
    fmt::{self, Display, Formatter},
    time::Duration,
};

use futures::TryStreamExt;
use tokio::time::sleep;
use tracing::info;
use wireframe::{WireframeError, response::Response};

const TRANSCRIPT: &[&str] = &[
    "Client: HELLO",
    "Server: HELLO-ACK",
    "Client: BEGIN",
    "Server: PREPARED",
];

#[derive(Debug, Clone, PartialEq, Eq)]
enum FrameKind {
    Chunk(usize),
    Summary,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct Frame {
    kind: FrameKind,
    data: String,
}

impl Frame {
    fn chunk(index: usize, data: &str) -> Self {
        Self {
            kind: FrameKind::Chunk(index),
            data: data.to_owned(),
        }
    }

    fn summary(total_chunks: usize) -> Self {
        Self {
            kind: FrameKind::Summary,
            data: format!("{total_chunks} transcript entries sent"),
        }
    }
}

impl Display for Frame {
    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
        match self.kind {
            FrameKind::Chunk(index) => write!(formatter, "Chunk {index}: {}", self.data),
            FrameKind::Summary => write!(formatter, "Summary: {}", self.data),
        }
    }
}

fn multi_packet_response() -> Response<Frame> {
    // Capacity two keeps memory usage tight and amplifies the back-pressure
    // effect for demonstration purposes.
    let (sender, response) = Response::with_channel(2);

    let summary_sender = sender.clone();
    let chunk_task = tokio::spawn(async move {
        // Capture the join handle so the summary task can wait for completion
        // before sending its final frame.
        for (index, line) in TRANSCRIPT.iter().enumerate() {
            let frame = Frame::chunk(index, line);
            if sender.send(frame).await.is_err() {
                tracing::trace!("connection dropped, stopping chunk task early");
                break;
            }

            // The brief pause simulates I/O or computation between frames.
            sleep(Duration::from_millis(25)).await;
        }
    });

    tokio::spawn(async move {
        // Wait for all chunks to be sent before delivering the summary.
        let _ = chunk_task.await;
        let summary = Frame::summary(TRANSCRIPT.len());
        if summary_sender.send(summary).await.is_err() {
            tracing::trace!("connection dropped, summary not sent");
        }
    });

    response
}

fn log_frame(frame: &Frame) {
    info!("{frame}");
}

async fn run() -> Result<(), WireframeError<()>> {
    let _ = tracing_subscriber::fmt::try_init();
    let response = multi_packet_response();
    let mut stream = response.into_stream();
    while let Some(frame) = stream.try_next().await? {
        log_frame(&frame);
    }
    Ok(())
}

fn main() -> std::io::Result<()> {
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()?;
    runtime
        .block_on(run())
        .map_err(|error| std::io::Error::other(format!("{error:?}")))?;
    Ok(())
}