use futures::sync::oneshot;
use serde::Deserialize;
use tokio::codec::{Framed, LengthDelimitedCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;
use crate::time::TimeStamp;
use crate::tsdb::{Observation, Sample, TsDbHandle};
pub struct PeerHandle {
kill_switch: oneshot::Sender<()>,
}
impl PeerHandle {
pub fn stop(self) {
self.kill_switch.send(()).unwrap();
}
}
#[derive(Deserialize, Debug)]
struct SampleBatch {
t0: f64,
dt: f64,
data: Vec<f64>,
}
impl SampleBatch {
fn to_samples(&self) -> Vec<Observation<Sample>> {
self.data
.iter()
.enumerate()
.map(|(index, value)| {
let t = self.t0 + self.dt * index as f64;
let timestamp = TimeStamp::new(t);
Observation::new(timestamp, Sample::new(*value))
})
.collect()
}
fn size(&self) -> usize {
self.data.len()
}
}
pub fn process_client(_counter: usize, socket: TcpStream, db: TsDbHandle) -> PeerHandle {
info!("Got incoming socket! {:?}", socket);
let trace_name = "Trace0";
let (_framed_sink, framed_stream) = Framed::new(socket, LengthDelimitedCodec::new()).split();
let client_task = framed_stream
.for_each(move |packet| {
let batch: SampleBatch = serde_cbor::from_slice(&packet).unwrap();
println!("DAATAA: {:?}", batch.size());
db.add_values(trace_name, batch.to_samples());
Ok(())
})
.map_err(|err| println!("Failed: {:?}", err));
let (kill_switch, c) = futures::sync::oneshot::channel::<()>();
let c = c.map_err(|_| ());
let task = client_task.select(c).map(|_| ()).map_err(|_| ());
tokio::spawn(task);
PeerHandle { kill_switch }
}