asciinema 3.2.0

Terminal session recorder, streamer, and player
use std::future;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use avt::Vt;
use futures_util::{stream, StreamExt};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::{io, time};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use tracing::info;

use crate::session::{self, Metadata};
use crate::tty::{TtySize, TtyTheme};

pub struct Stream {
    request_tx: mpsc::Sender<Request>,
    request_rx: mpsc::Receiver<Request>,
}

type Request = oneshot::Sender<Subscription>;

struct Subscription {
    init: Event,
    events_rx: broadcast::Receiver<Event>,
}

#[derive(Clone, Copy)]
pub struct EventId(u64);

#[derive(Clone)]
pub enum Event {
    Init(EventId, Duration, TtySize, Option<TtyTheme>, String),
    Output(EventId, Duration, String),
    Input(EventId, Duration, String),
    Resize(EventId, Duration, TtySize),
    Marker(EventId, Duration, String),
    Exit(EventId, Duration, i32),
    Eot(EventId, Duration),
}

#[derive(Clone)]
pub struct Subscriber(mpsc::Sender<Request>);

pub struct LiveStream(mpsc::Sender<session::Event>);

impl Stream {
    pub fn new() -> Self {
        let (request_tx, request_rx) = mpsc::channel(16);

        Stream {
            request_tx,
            request_rx,
        }
    }

    pub fn subscriber(&self) -> Subscriber {
        Subscriber(self.request_tx.clone())
    }

    pub async fn start(self, metadata: &Metadata) -> LiveStream {
        let (stream_tx, stream_rx) = mpsc::channel(1024);
        let request_rx = self.request_rx;

        tokio::spawn(run(
            metadata.term.size,
            metadata.term.theme.clone(),
            stream_rx,
            request_rx,
        ));

        LiveStream(stream_tx)
    }
}

async fn run(
    tty_size: TtySize,
    tty_theme: Option<TtyTheme>,
    mut stream_rx: mpsc::Receiver<session::Event>,
    mut request_rx: mpsc::Receiver<Request>,
) {
    let (broadcast_tx, _) = broadcast::channel(1024);
    let mut vt = build_vt(tty_size);
    let mut stream_time = Duration::from_micros(0);
    let mut last_event_id = EventId::new(0);
    let mut last_event_time = Instant::now();

    loop {
        tokio::select! {
            event = stream_rx.recv() => {
                match event {
                    Some(event) => {
                        last_event_time = Instant::now();
                        last_event_id = last_event_id.next();

                        match event {
                            session::Event::Output(time, text) => {
                                vt.feed_str(&text);
                                let _ = broadcast_tx.send(Event::Output(last_event_id, time, text));
                                stream_time = time;
                            }

                            session::Event::Input(time, text) => {
                                let _ = broadcast_tx.send(Event::Input(last_event_id, time, text));
                                stream_time = time;
                            }

                            session::Event::Resize(time, tty_size) => {
                                vt.resize(tty_size.0.into(), tty_size.1.into());
                                let _ = broadcast_tx.send(Event::Resize(last_event_id, time, tty_size));
                                stream_time = time;
                            }

                            session::Event::Marker(time, label) => {
                                let _ = broadcast_tx.send(Event::Marker(last_event_id, time, label));
                                stream_time = time;
                            }

                            session::Event::Exit(time, status) => {
                                let _ = broadcast_tx.send(Event::Exit(last_event_id, time, status));
                                stream_time = time;
                            }
                        }
                    }

                    None => {
                        let time = stream_time + last_event_time.elapsed();
                        let id = last_event_id.next();
                        let _ = broadcast_tx.send(Event::Eot(id, time));
                        break;
                    },
                }
            }

            request = request_rx.recv() => {
                match request {
                    Some(request) => {
                        let init = if last_event_id.as_u64() > 0 {
                            let elapsed_time = stream_time + last_event_time.elapsed();

                            Event::Init(
                                last_event_id,
                                elapsed_time,
                                vt.size().into(),
                                tty_theme.clone(),
                                vt.dump(),
                            )
                        } else {
                            Event::Init(
                                last_event_id,
                                stream_time,
                                vt.size().into(),
                                tty_theme.clone(),
                                "".to_owned(),
                            )
                        };

                        let events_rx = broadcast_tx.subscribe();
                        let _ = request.send(Subscription { init, events_rx });
                        info!("subscriber count: {}", broadcast_tx.receiver_count());
                    }

                    None => break,
                }
            }
        }
    }
}

impl EventId {
    fn new(id: u64) -> Self {
        EventId(id)
    }

    fn next(&self) -> Self {
        EventId(self.0 + 1)
    }

    pub fn as_u64(&self) -> u64 {
        self.0
    }
}

impl From<EventId> for u64 {
    fn from(id: EventId) -> Self {
        id.0
    }
}

impl From<u64> for EventId {
    fn from(id: u64) -> Self {
        EventId(id)
    }
}

impl Subscriber {
    pub async fn subscribe(
        &self,
    ) -> anyhow::Result<impl futures_util::Stream<Item = Result<Event, BroadcastStreamRecvError>>>
    {
        let (tx, rx) = oneshot::channel();
        self.0.send(tx).await?;
        let subscription = time::timeout(Duration::from_secs(5), rx).await??;
        let init = stream::once(future::ready(Ok(subscription.init)));
        let events = BroadcastStream::new(subscription.events_rx);

        Ok(init.chain(events))
    }
}

fn build_vt(tty_size: TtySize) -> Vt {
    Vt::builder()
        .size(tty_size.0 as usize, tty_size.1 as usize)
        .scrollback_limit(1000)
        .build()
}

#[async_trait]
impl session::Output for LiveStream {
    async fn event(&mut self, event: session::Event) -> io::Result<()> {
        self.0.send(event).await.map_err(io::Error::other)
    }

    async fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}