1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
//! Simple queue of asynchronous I/O work.

use super::*;
use log::debug;
use std::{collections::VecDeque, marker::PhantomPinned, pin::Pin, sync::Mutex};

/// Nng asynchronous I/O handle and queue of work items.
pub struct SimpleAioWorkQueue {
    worker: Pin<Box<AioQueue>>,
}

impl SimpleAioWorkQueue {
    pub fn new() -> Result<Self> {
        let worker = AioQueue::new()?;
        Ok(Self { worker })
    }
}

impl AioWorkQueue for SimpleAioWorkQueue {
    fn push_back(&mut self, obj: AioWorkRequest) {
        // Get mutable reference to pinned struct
        let inner: &mut _ = unsafe {
            let mut_ref = Pin::as_mut(&mut self.worker);
            Pin::get_unchecked_mut(mut_ref)
        };
        let mut shared = inner.shared.lock().unwrap();
        match shared.state {
            State::Idle => {
                shared.state = State::Busy;
                obj.begin(&inner.aio);
            }
            State::Busy => {}
        }
        shared.queue.push_back(obj);
    }
}

#[derive(Debug, PartialEq)]
enum State {
    Idle,
    Busy,
}

impl Default for State {
    fn default() -> Self {
        State::Idle
    }
}

#[derive(Default)]
struct SharedQueueData {
    state: State,
    queue: VecDeque<AioWorkRequest>,
}

struct AioQueue {
    aio: NngAio,
    shared: Mutex<SharedQueueData>,
    _phantom: PhantomPinned,
}

impl AioQueue {
    fn new_with_aio(aio: NngAio) -> AioQueue {
        Self {
            aio,
            shared: Default::default(),
            _phantom: PhantomPinned,
        }
    }

    pub fn new() -> Result<AioArg<AioQueue>> {
        NngAio::create(Self::new_with_aio, native_callback)
    }

    fn callback(&mut self) {
        let mut shared = self.shared.lock().unwrap();
        let front = shared.queue.pop_front();
        if shared.state != State::Idle {
            if let Some(mut front) = front {
                front.finish(self.aio());
                if let Some(next) = shared.queue.front() {
                    next.begin(self.aio());
                } else {
                    shared.state = State::Idle;
                }
                return;
            }
        }
        let res = unsafe { nng_int_to_result(nng_aio_result(self.aio.nng_aio())) };
        debug!("Unexpected callback: {:?}", res);
    }
}

impl Aio for AioQueue {
    fn aio(&self) -> &NngAio {
        &self.aio
    }
    fn aio_mut(&mut self) -> &mut NngAio {
        &mut self.aio
    }
}

impl std::fmt::Display for AioQueue {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "AioQueue {:?}", self.aio)
    }
}

unsafe extern "C" fn native_callback(arg: AioArgPtr) {
    let ctx = &mut *(arg as *mut AioQueue);
    ctx.callback();
}