io_engine/scheduler/
context.rs

1/*
2Copyright (c) NaturalIO Contributors
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in all
12copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20SOFTWARE.
21*/
22
23use std::{
24    cell::UnsafeCell,
25    collections::VecDeque,
26    io,
27    mem::transmute,
28    sync::{
29        Arc,
30        atomic::{AtomicBool, AtomicUsize, Ordering},
31    },
32    thread,
33};
34
35use crossbeam::{
36    channel::{Receiver, Sender, bounded},
37    queue::SegQueue,
38};
39use libc::c_long;
40use nix::errno::Errno;
41use parking_lot::Mutex;
42
43use super::{aio, callback_worker::*, tasks::*};
44
45struct IOContextInner<C: IOCallbackCustom> {
46    // the context handle for submitting AIO requests to the kernel
47    context: aio::aio_context_t,
48
49    depth: usize,
50    //queue_size: usize,
51    slots: UnsafeCell<Vec<IOEventTaskSlot<C>>>,
52    prio_count: AtomicUsize,
53    read_count: AtomicUsize,
54    write_count: AtomicUsize,
55    total_count: AtomicUsize,
56    // shared by submitting worker and polling worker
57    prio_queue: SegQueue<Box<IOEvent<C>>>,
58    read_queue: SegQueue<Box<IOEvent<C>>>,
59    write_queue: SegQueue<Box<IOEvent<C>>>,
60    threads: Mutex<Vec<thread::JoinHandle<()>>>,
61    running: AtomicBool,
62    cb_workers: IOWorkers<C>,
63    free_slots_count: AtomicUsize,
64}
65
66unsafe impl<C: IOCallbackCustom> Send for IOContextInner<C> {}
67
68unsafe impl<C: IOCallbackCustom> Sync for IOContextInner<C> {}
69
70pub struct IOContext<C: IOCallbackCustom> {
71    inner: Arc<IOContextInner<C>>,
72    noti_sender: Sender<()>,
73}
74
75#[derive(PartialEq, Debug, Clone, Copy)]
76#[repr(u8)]
77pub enum IOChannelType {
78    Prio = 0,
79    Read = 1,
80    Write = 2,
81}
82
83impl<C: IOCallbackCustom> Drop for IOContext<C> {
84    fn drop(&mut self) {
85        error!("drop");
86        self.inner.running.store(false, Ordering::SeqCst);
87    }
88}
89
90impl<C: IOCallbackCustom> IOContext<C> {
91    pub fn new(depth: usize, cbs: &IOWorkers<C>) -> Result<Arc<Self>, io::Error> {
92        let mut context: aio::aio_context_t = 0;
93        if aio::io_setup(depth as c_long, &mut context) != 0 {
94            return Err(io::Error::last_os_error());
95        }
96        let (s_noti, r_noti) = bounded::<()>(1);
97        let (s_free, r_free) = bounded::<u16>(depth);
98        for i in 0..depth {
99            let _ = s_free.send(i as u16);
100        }
101        let mut slots = Vec::with_capacity(depth);
102        for slot_id in 0..depth {
103            slots.push(IOEventTaskSlot::new(slot_id as u64));
104        }
105        let inner = Arc::new(IOContextInner {
106            context,
107            depth,
108            slots: UnsafeCell::new(slots),
109            running: AtomicBool::new(true),
110            threads: Mutex::new(Vec::new()),
111            prio_count: AtomicUsize::new(0),
112            read_count: AtomicUsize::new(0),
113            write_count: AtomicUsize::new(0),
114            total_count: AtomicUsize::new(0),
115            prio_queue: SegQueue::new(),
116            read_queue: SegQueue::new(),
117            write_queue: SegQueue::new(),
118            cb_workers: cbs.clone(),
119            free_slots_count: AtomicUsize::new(depth),
120        });
121
122        {
123            let mut threads = inner.threads.lock();
124            let inner1 = inner.clone();
125            let th = thread::spawn(move || inner1.worker_submit(r_noti, r_free));
126            threads.push(th);
127            let inner2 = inner.clone();
128            let sender_free = s_free.clone();
129            let th = thread::spawn(move || inner2.worker_poll(sender_free));
130            threads.push(th);
131        }
132        Ok(Arc::new(Self {
133            inner,
134            noti_sender: s_noti,
135        }))
136    }
137
138    #[inline]
139    pub fn get_depth(&self) -> usize {
140        self.get_inner().depth
141    }
142
143    #[inline(always)]
144    pub fn submit(
145        &self,
146        event: Box<IOEvent<C>>,
147        channel_type: IOChannelType,
148    ) -> Result<(), io::Error> {
149        let inner = &self.get_inner();
150        match channel_type {
151            IOChannelType::Prio => {
152                let _ = inner.prio_count.fetch_add(1, Ordering::SeqCst);
153                inner.prio_queue.push(event);
154            }
155            IOChannelType::Read => {
156                let _ = inner.read_count.fetch_add(1, Ordering::SeqCst);
157                inner.read_queue.push(event);
158            }
159            IOChannelType::Write => {
160                let _ = inner.write_count.fetch_add(1, Ordering::SeqCst);
161                inner.write_queue.push(event);
162            }
163        }
164        inner.total_count.fetch_add(1, Ordering::SeqCst);
165        let _ = self.noti_sender.try_send(());
166        Ok(())
167    }
168
169    #[inline(always)]
170    pub fn pending_count(&self) -> usize {
171        self.inner.total_count.load(Ordering::Acquire)
172    }
173
174    pub fn running_count(&self) -> usize {
175        let inner = self.get_inner();
176        let free = inner.free_slots_count.load(Ordering::SeqCst);
177        if free > inner.depth {
178            0
179        } else {
180            inner.depth - free
181        }
182    }
183
184    #[inline(always)]
185    fn get_inner(&self) -> &IOContextInner<C> {
186        self.inner.as_ref()
187    }
188}
189
190impl<C: IOCallbackCustom> IOContextInner<C> {
191    #[inline(always)]
192    fn verify_result(&self, slot: &mut IOEventTaskSlot<C>, info: &aio::io_event) -> bool {
193        if info.res <= 0 {
194            slot.set_error((-info.res) as i32, &self.cb_workers);
195            return true;
196        }
197        if slot.set_written(info.res as usize, &self.cb_workers) {
198            return true;
199        }
200        trace!("io not enough, resubmit");
201        // Write data not enough, resubmit.
202        let mut arr: [*mut aio::iocb; 1] = [&mut slot.iocb as *mut aio::iocb];
203        'submit: loop {
204            let result = aio::io_submit(self.context, 1, arr.as_mut_ptr() as *mut *mut aio::iocb);
205            if result < 0 {
206                if -result == Errno::EINTR as i64 {
207                    continue 'submit;
208                }
209                error!("io_re_submit error: {}", result);
210                slot.set_error(-result as i32, &self.cb_workers);
211                return true;
212            } else if result > 0 {
213                return false;
214            }
215        }
216    }
217
218    fn worker_poll(&self, free_sender: Sender<u16>) {
219        let depth = self.depth;
220        let mut infos = Vec::<aio::io_event>::with_capacity(depth);
221        let context = self.context;
222        let slots: &mut Vec<IOEventTaskSlot<C>> = unsafe { transmute(self.slots.get()) };
223        let ts = aio::timespec {
224            tv_sec: 2,
225            tv_nsec: 0,
226        };
227        loop {
228            infos.clear();
229            let result = aio::io_getevents(context, 1, depth as i64, infos.as_mut_ptr(), unsafe {
230                std::mem::transmute::<&aio::timespec, *mut aio::timespec>(&ts)
231            });
232            if result < 0 {
233                if -result == Errno::EINTR as i64 {
234                    continue;
235                }
236                if !self.running.load(Ordering::Acquire) {
237                    // device error and we are stopping
238                    break;
239                }
240                error!("io_getevents errno: {}", -result);
241                continue;
242            } else if result == 0 {
243                if !self.running.load(Ordering::Acquire) {
244                    // wait for all submmited io return
245                    if self.free_slots_count.load(Ordering::SeqCst) == self.depth {
246                        break;
247                    }
248                }
249                continue;
250            }
251            let _ = self
252                .free_slots_count
253                .fetch_add(result as usize, Ordering::SeqCst);
254            unsafe {
255                infos.set_len(result as usize);
256            }
257            for ref info in &infos {
258                let slot_id = (*info).data as usize;
259                if self.verify_result(&mut slots[slot_id], info) {
260                    let _ = free_sender.send(slot_id as u16);
261                }
262            }
263        }
264        info!("io_poll worker exit due to closing");
265        let _ = aio::io_destroy(self.context);
266    }
267
268    fn worker_submit(&self, noti_recv: Receiver<()>, free_recv: Receiver<u16>) {
269        let depth = self.depth;
270        let mut events = VecDeque::<Box<IOEvent<C>>>::with_capacity(depth);
271        let mut iocbs = Vec::<*mut aio::iocb>::with_capacity(depth);
272        let context = self.context;
273        let slots: &mut Vec<IOEventTaskSlot<C>> = unsafe { transmute(self.slots.get()) };
274        let mut last_write: bool = false;
275
276        'outer: loop {
277            if events.len() == 0 && self.total_count.load(Ordering::Acquire) == 0 {
278                if noti_recv.recv().is_err() {
279                    info!("io_submit worker exit due to closing");
280                    return;
281                }
282            }
283            'inner_queue: while events.len() < depth {
284                let mut got = false;
285                macro_rules! probe_queue {
286                    ($queue: expr, $count: expr) => {
287                        loop {
288                            if events.len() < depth {
289                                if let Some(event) = $queue.pop() {
290                                    got = true;
291                                    $count.fetch_sub(1, Ordering::SeqCst);
292                                    self.total_count.fetch_sub(1, Ordering::SeqCst);
293                                    events.push_back(event);
294                                } else {
295                                    break;
296                                }
297                            } else {
298                                break 'inner_queue;
299                            }
300                        }
301                    };
302                }
303                if self.prio_count.load(Ordering::Acquire) > 0 {
304                    probe_queue!(self.prio_queue, self.prio_count);
305                }
306                if last_write {
307                    last_write = false;
308                    if self.read_count.load(Ordering::Acquire) > 0 {
309                        probe_queue!(self.read_queue, self.read_count);
310                    }
311                    if self.write_count.load(Ordering::Acquire) > 0 {
312                        probe_queue!(self.write_queue, self.write_count);
313                    }
314                } else {
315                    last_write = true;
316                    if self.write_count.load(Ordering::Acquire) > 0 {
317                        probe_queue!(self.write_queue, self.write_count);
318                    }
319                    if self.read_count.load(Ordering::Acquire) > 0 {
320                        probe_queue!(self.read_queue, self.read_count);
321                    }
322                }
323                if got {
324                    // we got something from queue in this loop, try to get more.
325                    continue 'inner_queue;
326                } else {
327                    // nothing in queue
328                    if events.len() > 0 {
329                        break;
330                    } else {
331                        // continue to block
332                        continue 'outer;
333                    }
334                }
335            }
336            log_debug_assert!(
337                events.len() <= self.depth,
338                "events {} {} {}",
339                events.len(),
340                events.capacity(),
341                self.depth
342            );
343            log_debug_assert!(events.len() > 0, "events.len()>0");
344            while events.len() > 0 {
345                let slot_id = {
346                    if iocbs.len() == 0 {
347                        free_recv.recv().unwrap()
348                    } else {
349                        if let Ok(_slot_id) = free_recv.try_recv() {
350                            _slot_id
351                        } else {
352                            break;
353                        }
354                    }
355                };
356                let event = events.pop_front().unwrap();
357                let slot = &mut slots[slot_id as usize];
358                slot.fill_slot(event, slot_id);
359                iocbs.push(&mut slot.iocb as *mut aio::iocb);
360            }
361            let mut done: libc::c_long = 0;
362            let mut left = iocbs.len();
363            if left > 0 {
364                'submit: loop {
365                    let _ = self.free_slots_count.fetch_sub(left, Ordering::SeqCst);
366                    let result = unsafe {
367                        let arr = iocbs.as_mut_ptr().add(done as usize);
368                        //trace!("io_submiting done {} left {}", done, left);
369                        aio::io_submit(context, left as libc::c_long, arr)
370                    };
371                    if result < 0 {
372                        let _ = self.free_slots_count.fetch_add(left, Ordering::SeqCst); // submit failed add back
373                        if -result == Errno::EINTR as i64 {
374                            continue 'submit;
375                        }
376                        error!("io_submit error: {}", result);
377                        // TODO Error
378                    } else {
379                        if result == left as libc::c_long {
380                            trace!("io submit {} events", result);
381                            break 'submit;
382                        } else {
383                            let _ = self
384                                .free_slots_count
385                                .fetch_add(left - result as usize, Ordering::SeqCst); // submit partial add back
386                            done += result;
387                            left -= result as usize;
388                            trace!("io submit {}/{} events", result, left);
389                        }
390                    }
391                }
392            }
393            iocbs.clear();
394        }
395    }
396}