io_engine/scheduler/
context.rs

1/*
2Copyright (c) NaturalIO Contributors
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in all
12copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20SOFTWARE.
21*/
22
23use std::{
24    cell::UnsafeCell,
25    collections::VecDeque,
26    io,
27    mem::transmute,
28    sync::{
29        Arc,
30        atomic::{AtomicBool, AtomicUsize, Ordering},
31    },
32    thread,
33};
34
35use crossbeam::{
36    channel::{Receiver, Sender, bounded},
37    queue::SegQueue,
38};
39use libc::c_long;
40use nix::errno::Errno;
41use parking_lot::Mutex;
42
43use super::{aio, callback_worker::*, tasks::*};
44
45struct IOContextInner<C: IOCallbackCustom> {
46    // the context handle for submitting AIO requests to the kernel
47    context: aio::aio_context_t,
48
49    depth: usize,
50    //queue_size: usize,
51    slots: UnsafeCell<Vec<IOEventTaskSlot<C>>>,
52    prio_count: AtomicUsize,
53    read_count: AtomicUsize,
54    write_count: AtomicUsize,
55    total_count: AtomicUsize,
56    // shared by submitting worker and polling worker
57    prio_queue: SegQueue<Box<IOEvent<C>>>,
58    read_queue: SegQueue<Box<IOEvent<C>>>,
59    write_queue: SegQueue<Box<IOEvent<C>>>,
60    threads: Mutex<Vec<thread::JoinHandle<()>>>,
61    running: AtomicBool,
62    cb_workers: IOWorkers<C>,
63    free_slots_count: AtomicUsize,
64}
65
66unsafe impl<C: IOCallbackCustom> Send for IOContextInner<C> {}
67
68unsafe impl<C: IOCallbackCustom> Sync for IOContextInner<C> {}
69
70pub struct IOContext<C: IOCallbackCustom> {
71    inner: Arc<IOContextInner<C>>,
72    noti_sender: Sender<()>,
73}
74
75#[derive(PartialEq, Debug, Clone, Copy)]
76#[repr(u8)]
77pub enum IOChannelType {
78    Prio = 0,
79    Read = 1,
80    Write = 2,
81}
82
83impl<C: IOCallbackCustom> Drop for IOContext<C> {
84    fn drop(&mut self) {
85        error!("drop");
86        self.inner.running.store(false, Ordering::SeqCst);
87    }
88}
89
90impl<C: IOCallbackCustom> IOContext<C> {
91    pub fn new(depth: usize, cbs: &IOWorkers<C>) -> Result<Arc<Self>, io::Error> {
92        let mut context: aio::aio_context_t = 0;
93        if aio::io_setup(depth as c_long, &mut context) != 0 {
94            return Err(io::Error::last_os_error());
95        }
96        let (s_noti, r_noti) = bounded::<()>(1);
97        let (s_free, r_free) = bounded::<u16>(depth);
98        for i in 0..depth {
99            let _ = s_free.send(i as u16);
100        }
101        let mut slots = Vec::with_capacity(depth);
102        for slot_id in 0..depth {
103            slots.push(IOEventTaskSlot::new(slot_id as u64));
104        }
105        let inner = Arc::new(IOContextInner {
106            context,
107            depth,
108            slots: UnsafeCell::new(slots),
109            running: AtomicBool::new(true),
110            threads: Mutex::new(Vec::new()),
111            prio_count: AtomicUsize::new(0),
112            read_count: AtomicUsize::new(0),
113            write_count: AtomicUsize::new(0),
114            total_count: AtomicUsize::new(0),
115            prio_queue: SegQueue::new(),
116            read_queue: SegQueue::new(),
117            write_queue: SegQueue::new(),
118            cb_workers: cbs.clone(),
119            free_slots_count: AtomicUsize::new(depth),
120        });
121
122        {
123            let mut threads = inner.threads.lock();
124            let inner1 = inner.clone();
125            let th = thread::spawn(move || inner1.worker_submit(r_noti, r_free));
126            threads.push(th);
127            let inner2 = inner.clone();
128            let sender_free = s_free.clone();
129            let th = thread::spawn(move || inner2.worker_poll(sender_free));
130            threads.push(th);
131        }
132        Ok(Arc::new(Self { inner, noti_sender: s_noti }))
133    }
134
135    #[inline]
136    pub fn get_depth(&self) -> usize {
137        self.get_inner().depth
138    }
139
140    #[inline(always)]
141    pub fn submit(
142        &self, event: Box<IOEvent<C>>, channel_type: IOChannelType,
143    ) -> Result<(), io::Error> {
144        let inner = &self.get_inner();
145        match channel_type {
146            IOChannelType::Prio => {
147                let _ = inner.prio_count.fetch_add(1, Ordering::SeqCst);
148                inner.prio_queue.push(event);
149            }
150            IOChannelType::Read => {
151                let _ = inner.read_count.fetch_add(1, Ordering::SeqCst);
152                inner.read_queue.push(event);
153            }
154            IOChannelType::Write => {
155                let _ = inner.write_count.fetch_add(1, Ordering::SeqCst);
156                inner.write_queue.push(event);
157            }
158        }
159        inner.total_count.fetch_add(1, Ordering::SeqCst);
160        let _ = self.noti_sender.try_send(());
161        Ok(())
162    }
163
164    #[inline(always)]
165    pub fn pending_count(&self) -> usize {
166        self.inner.total_count.load(Ordering::Acquire)
167    }
168
169    pub fn running_count(&self) -> usize {
170        let inner = self.get_inner();
171        let free = inner.free_slots_count.load(Ordering::SeqCst);
172        if free > inner.depth { 0 } else { inner.depth - free }
173    }
174
175    #[inline(always)]
176    fn get_inner(&self) -> &IOContextInner<C> {
177        self.inner.as_ref()
178    }
179}
180
181impl<C: IOCallbackCustom> IOContextInner<C> {
182    #[inline(always)]
183    fn verify_result(&self, slot: &mut IOEventTaskSlot<C>, info: &aio::io_event) -> bool {
184        if info.res <= 0 {
185            slot.set_error((-info.res) as i32, &self.cb_workers);
186            return true;
187        }
188        if slot.set_written(info.res as usize, &self.cb_workers) {
189            return true;
190        }
191        trace!("io not enough, resubmit");
192        // Write data not enough, resubmit.
193        let mut arr: [*mut aio::iocb; 1] = [&mut slot.iocb as *mut aio::iocb];
194        'submit: loop {
195            let result = aio::io_submit(self.context, 1, arr.as_mut_ptr() as *mut *mut aio::iocb);
196            if result < 0 {
197                if -result == Errno::EINTR as i64 {
198                    continue 'submit;
199                }
200                error!("io_re_submit error: {}", result);
201                slot.set_error(-result as i32, &self.cb_workers);
202                return true;
203            } else if result > 0 {
204                return false;
205            }
206        }
207    }
208
209    fn worker_poll(&self, free_sender: Sender<u16>) {
210        let depth = self.depth;
211        let mut infos = Vec::<aio::io_event>::with_capacity(depth);
212        let context = self.context;
213        let slots: &mut Vec<IOEventTaskSlot<C>> = unsafe { transmute(self.slots.get()) };
214        let ts = aio::timespec { tv_sec: 2, tv_nsec: 0 };
215        loop {
216            infos.clear();
217            let result = aio::io_getevents(context, 1, depth as i64, infos.as_mut_ptr(), unsafe {
218                std::mem::transmute::<&aio::timespec, *mut aio::timespec>(&ts)
219            });
220            if result < 0 {
221                if -result == Errno::EINTR as i64 {
222                    continue;
223                }
224                if !self.running.load(Ordering::Acquire) {
225                    // device error and we are stopping
226                    break;
227                }
228                error!("io_getevents errno: {}", -result);
229                continue;
230            } else if result == 0 {
231                if !self.running.load(Ordering::Acquire) {
232                    // wait for all submmited io return
233                    if self.free_slots_count.load(Ordering::SeqCst) == self.depth {
234                        break;
235                    }
236                }
237                continue;
238            }
239            let _ = self.free_slots_count.fetch_add(result as usize, Ordering::SeqCst);
240            unsafe {
241                infos.set_len(result as usize);
242            }
243            for ref info in &infos {
244                let slot_id = (*info).data as usize;
245                if self.verify_result(&mut slots[slot_id], info) {
246                    let _ = free_sender.send(slot_id as u16);
247                }
248            }
249        }
250        info!("io_poll worker exit due to closing");
251        let _ = aio::io_destroy(self.context);
252    }
253
254    fn worker_submit(&self, noti_recv: Receiver<()>, free_recv: Receiver<u16>) {
255        let depth = self.depth;
256        let mut events = VecDeque::<Box<IOEvent<C>>>::with_capacity(depth);
257        let mut iocbs = Vec::<*mut aio::iocb>::with_capacity(depth);
258        let context = self.context;
259        let slots: &mut Vec<IOEventTaskSlot<C>> = unsafe { transmute(self.slots.get()) };
260        let mut last_write: bool = false;
261
262        'outer: loop {
263            if events.len() == 0 && self.total_count.load(Ordering::Acquire) == 0 {
264                if noti_recv.recv().is_err() {
265                    info!("io_submit worker exit due to closing");
266                    return;
267                }
268            }
269            'inner_queue: while events.len() < depth {
270                let mut got = false;
271                macro_rules! probe_queue {
272                    ($queue: expr, $count: expr) => {
273                        loop {
274                            if events.len() < depth {
275                                if let Some(event) = $queue.pop() {
276                                    got = true;
277                                    $count.fetch_sub(1, Ordering::SeqCst);
278                                    self.total_count.fetch_sub(1, Ordering::SeqCst);
279                                    events.push_back(event);
280                                } else {
281                                    break;
282                                }
283                            } else {
284                                break 'inner_queue;
285                            }
286                        }
287                    };
288                }
289                if self.prio_count.load(Ordering::Acquire) > 0 {
290                    probe_queue!(self.prio_queue, self.prio_count);
291                }
292                if last_write {
293                    last_write = false;
294                    if self.read_count.load(Ordering::Acquire) > 0 {
295                        probe_queue!(self.read_queue, self.read_count);
296                    }
297                    if self.write_count.load(Ordering::Acquire) > 0 {
298                        probe_queue!(self.write_queue, self.write_count);
299                    }
300                } else {
301                    last_write = true;
302                    if self.write_count.load(Ordering::Acquire) > 0 {
303                        probe_queue!(self.write_queue, self.write_count);
304                    }
305                    if self.read_count.load(Ordering::Acquire) > 0 {
306                        probe_queue!(self.read_queue, self.read_count);
307                    }
308                }
309                if got {
310                    // we got something from queue in this loop, try to get more.
311                    continue 'inner_queue;
312                } else {
313                    // nothing in queue
314                    if events.len() > 0 {
315                        break;
316                    } else {
317                        // continue to block
318                        continue 'outer;
319                    }
320                }
321            }
322            log_debug_assert!(
323                events.len() <= self.depth,
324                "events {} {} {}",
325                events.len(),
326                events.capacity(),
327                self.depth
328            );
329            log_debug_assert!(events.len() > 0, "events.len()>0");
330            while events.len() > 0 {
331                let slot_id = {
332                    if iocbs.len() == 0 {
333                        free_recv.recv().unwrap()
334                    } else {
335                        if let Ok(_slot_id) = free_recv.try_recv() {
336                            _slot_id
337                        } else {
338                            break;
339                        }
340                    }
341                };
342                let event = events.pop_front().unwrap();
343                let slot = &mut slots[slot_id as usize];
344                slot.fill_slot(event, slot_id);
345                iocbs.push(&mut slot.iocb as *mut aio::iocb);
346            }
347            let mut done: libc::c_long = 0;
348            let mut left = iocbs.len();
349            if left > 0 {
350                'submit: loop {
351                    let _ = self.free_slots_count.fetch_sub(left, Ordering::SeqCst);
352                    let result = unsafe {
353                        let arr = iocbs.as_mut_ptr().add(done as usize);
354                        //trace!("io_submiting done {} left {}", done, left);
355                        aio::io_submit(context, left as libc::c_long, arr)
356                    };
357                    if result < 0 {
358                        let _ = self.free_slots_count.fetch_add(left, Ordering::SeqCst); // submit failed add back
359                        if -result == Errno::EINTR as i64 {
360                            continue 'submit;
361                        }
362                        error!("io_submit error: {}", result);
363                        // TODO Error
364                    } else {
365                        if result == left as libc::c_long {
366                            trace!("io submit {} events", result);
367                            break 'submit;
368                        } else {
369                            let _ = self
370                                .free_slots_count
371                                .fetch_add(left - result as usize, Ordering::SeqCst); // submit partial add back
372                            done += result;
373                            left -= result as usize;
374                            trace!("io submit {}/{} events", result, left);
375                        }
376                    }
377                }
378            }
379            iocbs.clear();
380        }
381    }
382}