websocat 1.11.0

Command-line client for web sockets, like netcat/curl/socat for ws://.
Documentation
use futures::future::ok;

use std::rc::Rc;

use super::{BoxedNewPeerFuture, Peer};
use super::{ConstructParams, PeerConstructor, Specifier};
use std::time::{SystemTime, UNIX_EPOCH, Instant};

use std::io::Read;
use tokio_io::AsyncRead;

use std::io::Error as IoError;

#[derive(Debug)]
pub struct TimestampPeer<T: Specifier>(pub T);
impl<T: Specifier> Specifier for TimestampPeer<T> {
    fn construct(&self, cp: ConstructParams) -> PeerConstructor {
        let inner = self.0.construct(cp.clone());
        inner.map(move |p, _| timestamp_peer(p, cp.program_options.timestamp_monotonic))
    }
    specifier_boilerplate!(noglobalstate has_subspec);
    self_0_is_subspecifier!(proxy_is_multiconnect);
}
specifier_class!(
    name = TimestampClass,
    target = TimestampPeer,
    prefixes = ["timestamp:"],
    arg_handling = subspec,
    overlay = true,
    MessageOriented,
    MulticonnectnessDependsOnInnerType,
    help = r#"
[A] Prepend timestamp to each incoming message.

Example: TODO
"#
);

pub fn timestamp_peer(inner_peer: Peer, monotonic: bool) -> BoxedNewPeerFuture {
    let instant = if monotonic { Some(Instant::now() )} else { None };
    let filtered = TimestampWrapper(inner_peer.0, instant);
    let thepeer = Peer::new(filtered, inner_peer.1, inner_peer.2);
    Box::new(ok(thepeer)) as BoxedNewPeerFuture
}
struct TimestampWrapper(Box<dyn AsyncRead>, Option<Instant>);

impl Read for TimestampWrapper {
    fn read(&mut self, b: &mut [u8]) -> Result<usize, IoError> {
        let l = b.len();
        assert!(l > 1);
        let n = match self.0.read(&mut b[..l]) {
            Ok(x) => x,
            Err(e) => return Err(e),
        };
        if n == 0 {
            return Ok(0);
        }

        let mut v: Vec<u8> = Vec::with_capacity(n + 50);
        {
            let mut vv = ::std::io::Cursor::new(&mut v);
            use std::io::Write;
            let x = if let Some(basetime) = self.1 {
                Instant::now().duration_since(basetime).as_secs_f64()
            } else {
                (SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards")).as_secs_f64()
            };
            let _ = write!(vv, "{} ", x);
            let _ = vv.write_all(&b[..n]);
        }
        
        if v.len() as usize > l {
            warn!("Buffer too small, timstamp-prepended message may be truncated.");
        }
        let ll = v.len().min(l);
        (&mut b[..ll]).copy_from_slice(&v[..ll]);
        Ok(ll)
    }
}
impl AsyncRead for TimestampWrapper {}