servant 0.1.0

Reserve the name.
Documentation
// -- adapter.rs --

use {
    super::{
        drop_guard::DropGuard,
        servant::{Record, ServantRegister, ServantResult},
    },
    async_std::{net::TcpStream, prelude::*, sync::Mutex, task},
    codec::RecordCodec,
    futures::{
        channel::mpsc::{unbounded, UnboundedSender},
        pin_mut, select,
        sink::SinkExt,
        FutureExt as _,
    },
    futures_codec::{FramedRead, FramedWrite},
    log::{info, warn},
    std::{collections::HashMap, net::SocketAddr},
};

// --

lazy_static! {
    static ref ADAPTER_REGISTER: AdapterRegister = AdapterRegister(Mutex::new(_Register {
        passcode: 238,
        id: 0,
        accept_tx: None,
        senders: HashMap::new(),
    }));
}

struct _Register {
    passcode: usize,
    id: usize,
    accept_tx: Option<UnboundedSender<()>>,
    senders: HashMap<SocketAddr, UnboundedSender<Record>>,
}

pub struct AdapterRegister(Mutex<_Register>);
impl AdapterRegister {
    pub fn instance() -> &'static Self {
        &ADAPTER_REGISTER
    }
    pub async fn clean(&self, passcode: usize) {
        let mut g = self.0.lock().await;
        if passcode == g.passcode {
            g.accept_tx.take();
            g.senders.clear();
        }
    }
    pub async fn set_accept(&self, tx: UnboundedSender<()>) {
        let mut g = self.0.lock().await;
        g.accept_tx = Some(tx);
    }
    pub async fn insert(&self, addr: SocketAddr, tx: UnboundedSender<Record>) {
        let mut g = self.0.lock().await;
        g.senders.insert(addr, tx);
    }
    pub async fn remove(&self, addr: &SocketAddr) {
        let mut g = self.0.lock().await;
        g.senders.remove(addr);
    }
    pub async fn send(&self, msg: Vec<u8>) {
        let mut g = self.0.lock().await;
        g.id += 1;
        let notice = Record::Notice { id: g.id, msg };
        let mut values = g.senders.values();
        while let Some(mut s) = values.next() {
            s.send(notice.clone())
                .await
                .unwrap_or_else(|e| warn!("{}", e.to_string()));
        }
    }
}

// --

pub struct Adapter;

impl Adapter {
    pub fn new() -> Self {
        Self
    }
    pub async fn run(self, stream: TcpStream) -> std::io::Result<()> {
        #[derive(Debug)]
        enum SelectedValue {
            ReadNone,
            WriteNone,
            Read(Record),
            Write(Record),
        };

        let addr = stream.peer_addr().unwrap();
        info!("connected from {}", &addr);
        let (reader, writer) = &mut (&stream, &stream);
        let read_framed = FramedRead::new(reader, RecordCodec::<u32, Record>::default());
        let mut write_framed = FramedWrite::new(writer, RecordCodec::<u32, Record>::default());

        let (tx, rx) = unbounded();
        AdapterRegister::instance().insert(addr, tx).await;

        let _adapter_clean = DropGuard::new(addr, |a| {
            task::block_on(async move {
                info!("adapter from {} quit.", &addr);
                AdapterRegister::instance().remove(&a).await;
            });
        });

        pin_mut!(read_framed, rx);
        loop {
            let value = select! {
                from_terminal = read_framed.next().fuse() => match from_terminal {
                    Some(record) => SelectedValue::Read(record?),
                    None => SelectedValue::ReadNone,
                },
                to_terminal = rx.next().fuse() => match to_terminal {
                    Some(record) => SelectedValue::Write(record),
                    None => SelectedValue::WriteNone,
                },
            };
            let sr = ServantRegister::instance();
            match value {
                SelectedValue::Read(record) => match record {
                    Record::Report { id, oid, msg } => {
                        let _id = id;
                        if let Some(servant) = sr.find_report_servant(&oid) {
                            servant.lock().unwrap().serve(msg);
                        } else {
                            warn!("{} dosen't exist.", &oid);
                        }
                    }
                    Record::Request { id, ctx, oid, req } => {
                        let ret: ServantResult<Vec<u8>> = if let Some(oid) = &oid {
                            if let Some(servant) = sr.find_servant(oid) {
                                Ok(servant.lock().unwrap().serve(ctx, req))
                            } else {
                                Err(format!("{} dosen't exist.", &oid).into())
                            }
                        } else {
                            if let Some(query) = sr.query_servant() {
                                let mut q = query.lock().unwrap();
                                Ok(q.serve(ctx, req))
                            } else {
                                Err("query servant dosen't exist.".into())
                            }
                        };
                        match bincode::serialize(&ret) {
                            Ok(ret) => {
                                let record = Record::Response { id, oid, ret };
                                write_framed.send(record).await?;
                            }
                            Err(e) => warn!("{}", e.to_string()),
                        }
                    }
                    Record::Response { .. } => unreachable!(),
                    Record::Notice { .. } => unreachable!(),
                },
                SelectedValue::Write(record) => {
                    write_framed.send(record).await?;
                }
                _ => {
                    info!(
                        "loop break due to SelectValue({:?}). peer address: {}",
                        &value, &addr
                    );
                    break;
                }
            }
        }

        Ok(())
    }
}