soil-telemetry 0.2.0

Telemetry utils
Documentation
// This file is part of Soil.

// Copyright (C) Soil contributors.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

use futures::{
	prelude::*,
	ready,
	task::{Context, Poll},
};
use libp2p::{core::transport::timeout::TransportTimeout, Transport};
use std::{io, pin::Pin, time::Duration};

/// Timeout after which a connection attempt is considered failed. Includes the WebSocket HTTP
/// upgrading.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(20);

pub(crate) fn initialize_transport() -> Result<WsTrans, io::Error> {
	let transport = {
		let tcp_transport = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new());
		let inner = libp2p::dns::tokio::Transport::system(tcp_transport)?;
		libp2p::websocket::framed::WsConfig::new(inner).and_then(|connec, _| {
			let connec = connec
				.with(|item| {
					let item = libp2p::websocket::framed::OutgoingData::Binary(item);
					future::ready(Ok::<_, io::Error>(item))
				})
				.try_filter_map(|item| async move {
					if let libp2p::websocket::framed::Incoming::Data(data) = item {
						Ok(Some(data.into_bytes()))
					} else {
						Ok(None)
					}
				});
			future::ready(Ok::<_, io::Error>(connec))
		})
	};

	Ok(TransportTimeout::new(
		transport.map(|out, _| {
			let out = out
				.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
				.sink_map_err(|err| io::Error::new(io::ErrorKind::Other, err));
			Box::pin(out) as Pin<Box<_>>
		}),
		CONNECT_TIMEOUT,
	)
	.boxed())
}

/// A trait that implements `Stream` and `Sink`.
pub(crate) trait StreamAndSink<I>: Stream + Sink<I> {}
impl<T: ?Sized + Stream + Sink<I>, I> StreamAndSink<I> for T {}

/// A type alias for the WebSocket transport.
pub(crate) type WsTrans = libp2p::core::transport::Boxed<
	Pin<
		Box<
			dyn StreamAndSink<Vec<u8>, Item = Result<Vec<u8>, io::Error>, Error = io::Error> + Send,
		>,
	>,
>;

/// Wraps around an `AsyncWrite` and implements `Sink`. Guarantees that each item being sent maps
/// to one call of `write`.
#[pin_project::pin_project]
pub(crate) struct StreamSink<T>(#[pin] T, Option<Vec<u8>>);

impl<T> From<T> for StreamSink<T> {
	fn from(inner: T) -> StreamSink<T> {
		StreamSink(inner, None)
	}
}

impl<T: AsyncRead> Stream for StreamSink<T> {
	type Item = Result<Vec<u8>, io::Error>;

	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
		let this = self.project();
		let mut buf = vec![0; 128];
		match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) {
			Ok(0) => Poll::Ready(None),
			Ok(n) => {
				buf.truncate(n);
				Poll::Ready(Some(Ok(buf)))
			},
			Err(err) => Poll::Ready(Some(Err(err))),
		}
	}
}

impl<T: AsyncWrite> StreamSink<T> {
	fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
		let this = self.project();

		if let Some(buffer) = this.1 {
			if ready!(this.0.poll_write(cx, &buffer[..]))? != buffer.len() {
				log::error!(target: "telemetry",
					"Detected some internal buffering happening in the telemetry");
				let err = io::Error::new(io::ErrorKind::Other, "Internal buffering detected");
				return Poll::Ready(Err(err));
			}
		}

		*this.1 = None;
		Poll::Ready(Ok(()))
	}
}

impl<T: AsyncWrite> Sink<Vec<u8>> for StreamSink<T> {
	type Error = io::Error;

	fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
		ready!(StreamSink::poll_flush_buffer(self, cx))?;
		Poll::Ready(Ok(()))
	}

	fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
		let this = self.project();
		debug_assert!(this.1.is_none());
		*this.1 = Some(item);
		Ok(())
	}

	fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
		ready!(self.as_mut().poll_flush_buffer(cx))?;
		let this = self.project();
		AsyncWrite::poll_flush(this.0, cx)
	}

	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
		ready!(self.as_mut().poll_flush_buffer(cx))?;
		let this = self.project();
		AsyncWrite::poll_close(this.0, cx)
	}
}