sans-io-runtime 0.3.0

An opinionated SANS-IO runtime for SDN and media servers
Documentation
use std::{
    net::SocketAddr,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    time::{Duration, Instant},
};

use sans_io_runtime::{
    backend::{BackendIncoming, BackendOutgoing, PollBackend},
    collections::DynamicDeque,
    group_owner_type, Buffer, Controller, Task, TaskGroup, TaskGroupOutput, TaskSwitcherChild,
    WorkerInner, WorkerInnerInput, WorkerInnerOutput,
};

type ExtIn = ();
type ExtOut = ();
type ICfg = ();
type SCfg = EchoTaskCfg;
type ChannelId = ();
type Event = ();

enum EchoTaskOutput {
    Net(BackendOutgoing),
    OnResourceEmpty,
}

#[derive(Debug, Clone)]
struct EchoTaskCfg {
    count: usize,
    dest: SocketAddr,
    brust_size: usize,
}

struct EchoTask {
    count: usize,
    cfg: EchoTaskCfg,
    local_addr: SocketAddr,
    local_backend_slot: usize,
    output: DynamicDeque<EchoTaskOutput, 16>,
    shutdown: bool,
}

impl EchoTask {
    pub fn new(cfg: EchoTaskCfg) -> Self {
        log::info!("Create new echo client task in addr {}", cfg.dest);
        Self {
            count: 0,
            cfg,
            local_addr: SocketAddr::from(([127, 0, 0, 1], 0)),
            local_backend_slot: 0,
            output: DynamicDeque::from([EchoTaskOutput::Net(BackendOutgoing::UdpListen {
                addr: SocketAddr::from(([127, 0, 0, 1], 0)),
                reuse: false,
            })]),
            shutdown: false,
        }
    }
}

impl Task<BackendIncoming, EchoTaskOutput> for EchoTask {
    fn on_tick(&mut self, _now: Instant) {}

    fn on_event(&mut self, _now: Instant, input: BackendIncoming) {
        match input {
            BackendIncoming::UdpListenResult { bind, result } => {
                log::info!("UdpListenResult: {} {:?}", bind, result);
                if let Ok((addr, slot)) = result {
                    self.local_addr = addr;
                    self.local_backend_slot = slot;
                    for _ in 0..self.cfg.brust_size {
                        self.output
                            .push_back(EchoTaskOutput::Net(BackendOutgoing::UdpPacket {
                                slot: self.local_backend_slot,
                                to: self.cfg.dest,
                                data: Buffer::from([0; 1000].to_vec()),
                            }));
                    }
                }
            }
            BackendIncoming::UdpPacket { from, slot, data } => {
                self.count += 1;
                if self.count >= self.cfg.count {
                    log::info!("EchoTask done");
                    self.shutdown = true;
                } else {
                    self.output
                        .push_back(EchoTaskOutput::Net(BackendOutgoing::UdpPacket {
                            slot,
                            to: from,
                            data,
                        }));
                }
            }
        }
    }

    fn on_shutdown(&mut self, _now: Instant) {
        log::info!("EchoTask shutdown");
        self.output
            .push_back(EchoTaskOutput::Net(BackendOutgoing::UdpUnlisten {
                slot: self.local_backend_slot,
            }));
        self.shutdown = true;
    }
}

impl TaskSwitcherChild<EchoTaskOutput> for EchoTask {
    type Time = Instant;

    fn empty_event(&self) -> EchoTaskOutput {
        EchoTaskOutput::OnResourceEmpty
    }

    fn is_empty(&self) -> bool {
        self.shutdown && self.output.is_empty()
    }

    fn pop_output(&mut self, _now: Instant) -> Option<EchoTaskOutput> {
        self.output.pop_front()
    }
}

group_owner_type!(OwnerType);

struct EchoWorkerInner {
    worker: u16,
    tasks: TaskGroup<BackendIncoming, EchoTaskOutput, EchoTask, 16>,
}

impl WorkerInner<OwnerType, ExtIn, ExtOut, ChannelId, Event, ICfg, SCfg> for EchoWorkerInner {
    fn tasks(&self) -> usize {
        self.tasks.tasks()
    }

    fn is_empty(&self) -> bool {
        self.tasks.is_empty()
    }

    fn worker_index(&self) -> u16 {
        self.worker
    }

    fn build(worker: u16, _cfg: ICfg) -> Self {
        Self {
            worker,
            tasks: TaskGroup::default(),
        }
    }

    fn spawn(&mut self, _now: Instant, cfg: SCfg) {
        self.tasks.add_task(EchoTask::new(cfg));
    }

    fn on_tick(&mut self, now: Instant) {
        self.tasks.on_tick(now);
    }

    fn on_event(
        &mut self,
        now: Instant,
        event: WorkerInnerInput<OwnerType, ExtIn, ChannelId, Event>,
    ) {
        match event {
            WorkerInnerInput::Net(owner, event) => {
                self.tasks.on_event(now, owner.index(), event);
            }
            _ => unreachable!(),
        }
    }

    fn pop_output(
        &mut self,
        now: Instant,
    ) -> Option<WorkerInnerOutput<OwnerType, ExtOut, ChannelId, Event, SCfg>> {
        loop {
            if let Some(TaskGroupOutput::TaskOutput(index, out)) = self.tasks.pop_output(now) {
                if let Some(out) = self.convert_output(OwnerType(index), out) {
                    return Some(out);
                }
            }
        }
    }

    fn on_shutdown(&mut self, now: Instant) {
        self.tasks.on_shutdown(now);
    }
}

impl EchoWorkerInner {
    fn convert_output(
        &mut self,
        owner: OwnerType,
        out: EchoTaskOutput,
    ) -> Option<WorkerInnerOutput<OwnerType, ExtOut, ChannelId, Event, SCfg>> {
        match out {
            EchoTaskOutput::Net(out) => Some(WorkerInnerOutput::Net(owner, out)),
            EchoTaskOutput::OnResourceEmpty => {
                self.tasks.remove_task(owner.index());
                Some(WorkerInnerOutput::Continue)
            }
        }
    }
}

fn main() {
    env_logger::init();
    println!("{}", std::mem::size_of::<EchoWorkerInner>());
    let mut controller =
        Controller::<ExtIn, ExtOut, EchoTaskCfg, ChannelId, Event, 1024>::default();
    controller.add_worker::<OwnerType, _, EchoWorkerInner, PollBackend<_, 1024, 1024>>(
        Duration::from_secs(1),
        (),
        None,
    );
    controller.add_worker::<OwnerType, _, EchoWorkerInner, PollBackend<_, 1024, 1024>>(
        Duration::from_secs(1),
        (),
        None,
    );

    for _i in 0..2 {
        controller.spawn(EchoTaskCfg {
            count: 1000,
            brust_size: 1,
            dest: SocketAddr::from(([127, 0, 0, 1], 10001)),
        });
        controller.spawn(EchoTaskCfg {
            count: 10000,
            brust_size: 1,
            dest: SocketAddr::from(([127, 0, 0, 1], 10002)),
        });
    }

    let term = Arc::new(AtomicBool::new(false));
    signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&term))
        .expect("Should register hook");

    while controller.process().is_some() {
        if term.load(Ordering::Relaxed) {
            controller.shutdown();
        }
        std::thread::sleep(Duration::from_millis(10));
    }

    log::info!("Server shutdown");
}