infotainer 0.0.1-alpha.1

building blocks for simple pubsub services
Documentation
use core::time;
use std::{convert::TryFrom, io::Error, str::FromStr, thread};

use actix::{
    io::{SinkWrite, WriteHandler},
    Actor, ActorContext, Arbiter, AsyncContext, Context, Handler, Message as ActorMessage,
    StreamHandler, System,
};
use actix_codec::Framed;
use actix_web::{
    client::{Client, WsProtocolError},
    web::Bytes,
};
use awc::{
    ws::{Codec, Frame, Message},
    BoxedSocket,
};
use futures::{stream::SplitSink, StreamExt};
use infotainer::{websocket::{ClientCommand, ServerMessage}};
use itertools::Itertools;
use uuid::Uuid;

static CLI_COMMANDS: &[&str] = &["PublishText", "Subscribe", "Unsubscribe"];

struct Connection(SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>);

impl Actor for Connection {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Context<Self>) {
        self.hb(ctx)
    }

    fn stopped(&mut self, _: &mut Context<Self>) {
        System::current().stop();
    }
}

impl StreamHandler<Result<Frame, WsProtocolError>> for Connection {
    fn handle(&mut self, msg: Result<Frame, WsProtocolError>, _: &mut Context<Self>) {
        match msg {
            Ok(frame) => match frame {
                Frame::Binary(data) => {
                    if let Ok(iss) = serde_cbor::from_slice::<ServerMessage>(&data) {
                        match iss {
                            ServerMessage::Issue(i) => {
                                let cmd = ClientCommand::GetLogEntries {
                                    log_id: i.0,
                                    entries: vec![i.1],
                                };
                                self.0.write(Message::Binary(Bytes::from(
                                    serde_cbor::to_vec(&cmd).unwrap(),
                                )));
                            }
                            ServerMessage::LogEntry(e) => {
                                for p in e {
                                    let data: String = String::from_utf8(p.data).unwrap();
                                    println!(
                                        "Received publication {} for Subscription {}:\n{}",
                                        p.publication_id, p.subscription_id, data
                                    )
                                }
                            }
                            ServerMessage::LogIndex(i) => println!("{:?}", i),
                        }
                    } else {
                        println!("Unable to handle received message");
                    }
                }
                _ => (),
            },
            Err(e) => println!("{:?}", e),
        }
    }

    fn started(&mut self, _: &mut Context<Self>) {
        println!("Connected");
    }

    fn finished(&mut self, ctx: &mut Context<Self>) {
        println!("Disconnected");
        ctx.stop();
    }
}

impl WriteHandler<WsProtocolError> for Connection {}

impl Handler<CliCommand> for Connection {
    type Result = ();

    fn handle(&mut self, msg: CliCommand, _: &mut Self::Context) -> Self::Result {
        self.0.write(Message::Binary(Bytes::from(
            serde_cbor::to_vec(&ClientCommand::from(msg.into())).unwrap(),
        )));
    }
}

impl Connection {
    fn hb(&self, ctx: &mut Context<Self>) {
        ctx.run_interval(time::Duration::new(5, 0), |act, _| {
            act.0.write(Message::Ping(Bytes::new()));
        });
    }
}

#[derive(Debug, ActorMessage)]
#[rtype("()")]
enum CliCommand {
    PublishText(Uuid, String),
    Subscribe(Uuid),
    Unsubscribe(Uuid),
}

impl TryFrom<String> for CliCommand {
    type Error = Box<dyn std::error::Error>;

    fn try_from(cmdline: String) -> Result<Self, Self::Error> {
        let mut cli_input = cmdline.split(" ");
        let cmd = cli_input.next().ok_or(Error::new(
            std::io::ErrorKind::InvalidInput,
            "Missing command",
        ))?;
        if !CLI_COMMANDS.contains(&cmd) {}
        let id = Uuid::from_str(cli_input.next().ok_or(Error::new(
            std::io::ErrorKind::InvalidInput,
            "Missing log id parameter",
        ))?)?;
        match cmd {
            "PublishText" => Ok(CliCommand::PublishText(id, cli_input.join(" "))),
            "Subscribe" => Ok(CliCommand::Subscribe(id)),
            "Unsubscribe" => Ok(CliCommand::Unsubscribe(id)),
            _ => Err(Box::new(Error::new(
                std::io::ErrorKind::InvalidInput,
                "Invalid command",
            ))),
        }
    }
}

impl Into<ClientCommand> for CliCommand {
    fn into(self) -> ClientCommand {
        match self {
            CliCommand::PublishText(subscription_id, submission) => {
                ClientCommand::SubmitPublication {
                    subscription_id,
                    submission: submission.into(),
                }
            }
            CliCommand::Subscribe(subscription_id) => ClientCommand::Subscribe { subscription_id },
            CliCommand::Unsubscribe(subscription_id) => {
                ClientCommand::Unsubscribe { subscription_id }
            }
        }
    }
}

fn main() -> std::io::Result<()> {
    env_logger::init();
    let client_id = Uuid::new_v4();

    let sys = System::new("infotainer-client-example");

    Arbiter::spawn(async move {
        let (response, framed) = Client::default()
            .ws(format!("ws://127.0.0.1:1312/ws/{}", client_id))
            .connect()
            .await
            .unwrap();
        println!("Response: {:?}", response);
        let (sink, stream) = framed.split();
        let conn = Connection::create(|ctx| {
            Connection::add_stream(stream, ctx);
            Connection(SinkWrite::new(sink, ctx))
        });
        thread::spawn(move || loop {
            let mut cmd = String::default();
            if let Err(e) = std::io::stdin().read_line(&mut cmd) {
                println!("Could not read from commandline: {:?}", e);
                return;
            }
            match CliCommand::try_from(cmd.strip_suffix("\n").unwrap().to_owned()) {
                Ok(c) => conn.do_send(c),
                Err(e) => println!("Error: {:?}", e),
            }
        });
    });
    sys.run()
}