gundb 0.2.1

Distributed graph database that syncs over websockets and multicast
Documentation
use crate::message::Message;
use crate::actor::{Actor, ActorContext};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::SinkExt;

use async_trait::async_trait;

use futures_util::{future, TryStreamExt};
use log::{info, error, debug};

use tokio_tungstenite::{WebSocketStream, MaybeTlsStream};
use tokio_tungstenite::tungstenite::{Message as WsMessage};

type WsStream = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
type WsSender = SplitSink<WsStream, WsMessage>;
type WsReceiver = SplitStream<WsStream>;

pub struct WsConn {
    sender: WsSender,
    receiver: Option<WsReceiver>
}

impl WsConn {
    pub fn new(sender: WsSender, receiver: WsReceiver) -> Self {
        Self {
            sender: sender,
            receiver: Some(receiver)
        }
    }
}

#[async_trait]
impl Actor for WsConn {
    async fn handle(&mut self, msg: Message, _ctx: &ActorContext) {
        let _ = self.sender.send(WsMessage::Text(msg.to_string())).await;
    }

    async fn pre_start(&mut self, ctx: &ActorContext) {
        info!("WsConn starting");
        let hi = Message::Hi { from: ctx.addr.clone(), peer_id: ctx.peer_id.read().unwrap().clone() };
        let _ = self.sender.send(WsMessage::Text(hi.to_string())).await;
        let receiver = self.receiver.take().unwrap();
        let ctx2 = ctx.clone();
        ctx.abort_on_stop(tokio::spawn(async move {
            let _ = receiver.try_for_each(|msg| {
                if let Ok(s) = msg.to_text() {
                    match Message::try_from(s, ctx2.addr.clone()) {
                        Ok(msgs) => {
                            debug!("ws_conn in {}", s);
                            for msg in msgs.into_iter() {
                                if let Err(e) = ctx2.router.sender.send(msg) {
                                    error!("failed to send incoming message to node: {}", e);
                                }
                            }
                        },
                        _ => {}
                    };
                }
                future::ok(())
            }).await;
        }));
    }

    async fn stopping(&mut self, _context: &ActorContext) {
        info!("WsConn stopping");
    }
}