sans-io-runtime 0.3.0

An opinionated SANS-IO runtime for SDN and media servers
Documentation
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{
    net::SocketAddr,
    time::{Duration, Instant},
};

use sans_io_runtime::backend::{BackendIncoming, BackendOutgoing};
use sans_io_runtime::collections::DynamicDeque;
use sans_io_runtime::{
    backend::PollBackend, Controller, WorkerInner, WorkerInnerInput, WorkerInnerOutput,
};

type ExtIn = ();
type ExtOut = ();
type ChannelId = ();
type Event = ();
type ICfg = EchoWorkerCfg;
type SCfg = ();

type OwnerType = ();

struct EchoWorkerCfg {
    bind: SocketAddr,
}

struct EchoWorker {
    worker: u16,
    backend_slot: usize,
    output: DynamicDeque<WorkerInnerOutput<OwnerType, ExtOut, ChannelId, Event, SCfg>, 16>,
    shutdown: bool,
}

impl WorkerInner<OwnerType, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg> for EchoWorker {
    fn build(worker: u16, cfg: EchoWorkerCfg) -> Self {
        log::info!("Create new echo task in addr {}", cfg.bind);
        Self {
            worker,
            backend_slot: 0,
            output: DynamicDeque::from([WorkerInnerOutput::Net(
                (),
                BackendOutgoing::UdpListen {
                    addr: cfg.bind,
                    reuse: true,
                },
            )]),
            shutdown: false,
        }
    }

    fn worker_index(&self) -> u16 {
        self.worker
    }

    fn tasks(&self) -> usize {
        if self.shutdown {
            0
        } else {
            1
        }
    }

    fn is_empty(&self) -> bool {
        self.shutdown && self.output.is_empty()
    }

    fn spawn(&mut self, _now: Instant, _cfg: SCfg) {}
    fn on_tick(&mut self, _now: Instant) {}
    fn on_event(
        &mut self,
        _now: Instant,
        event: WorkerInnerInput<OwnerType, ExtIn, ChannelId, Event>,
    ) {
        match event {
            WorkerInnerInput::Net(_owner, BackendIncoming::UdpListenResult { bind, result }) => {
                log::info!("UdpListenResult: {} {:?}", bind, result);
                self.backend_slot = result.expect("Should bind success").1;
            }
            WorkerInnerInput::Net(_owner, BackendIncoming::UdpPacket { from, slot, data }) => {
                assert!(data.len() <= 1500, "data too large");
                self.output.push_back(WorkerInnerOutput::Net(
                    (),
                    BackendOutgoing::UdpPacket {
                        slot,
                        to: from,
                        data,
                    },
                ));
            }
            _ => {}
        }
    }

    fn pop_output(
        &mut self,
        _now: Instant,
    ) -> Option<WorkerInnerOutput<OwnerType, ExtOut, ChannelId, Event, SCfg>> {
        self.output.pop_front()
    }

    fn on_shutdown(&mut self, _now: Instant) {
        if self.shutdown {
            return;
        }
        log::info!("EchoServer {} shutdown", self.worker);
        self.shutdown = true;
        self.output.push_back(WorkerInnerOutput::Net(
            (),
            BackendOutgoing::UdpUnlisten {
                slot: self.backend_slot,
            },
        ));
    }
}

fn main() {
    env_logger::init();
    let mut controller = Controller::<ExtIn, ExtOut, SCfg, ChannelId, Event, 1024>::default();
    controller.add_worker::<OwnerType, _, EchoWorker, PollBackend<_, 16, 1024>>(
        Duration::from_secs(1),
        EchoWorkerCfg {
            bind: SocketAddr::from(([127, 0, 0, 1], 10001)),
        },
        None,
    );
    controller.add_worker::<OwnerType, _, EchoWorker, PollBackend<_, 16, 1024>>(
        Duration::from_secs(1),
        EchoWorkerCfg {
            bind: SocketAddr::from(([127, 0, 0, 1], 10002)),
        },
        None,
    );
    let term = Arc::new(AtomicBool::new(false));
    signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&term))
        .expect("Should register hook");

    while controller.process().is_some() {
        if term.load(Ordering::Relaxed) {
            controller.shutdown();
        }
        std::thread::sleep(Duration::from_millis(10));
    }

    log::info!("Server shutdown");
}