1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use bytes::Bytes;
use futures::channel::oneshot;
use futures::{FutureExt, StreamExt, TryFutureExt};
use object_store::path::Path;
use snafu::{location, Location};
use std::future::Future;
use std::ops::Range;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

use lance_core::{Error, Result};

use crate::object_store::ObjectStore;
use crate::traits::Reader;

// There is one instance of MutableBatch shared by all the I/O operations
// that make up a single request.  When all the I/O operations complete
// then the MutableBatch goes out of scope and the batch request is considered
// complete
struct MutableBatch<F: FnOnce(Result<Vec<Bytes>>) + Send> {
    when_done: Option<F>,
    data_buffers: Vec<Bytes>,
    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
}

impl<F: FnOnce(Result<Vec<Bytes>>) + Send> MutableBatch<F> {
    fn new(when_done: F, num_data_buffers: u32) -> Self {
        Self {
            when_done: Some(when_done),
            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
            err: None,
        }
    }
}

// Rather than keep track of when all the I/O requests are finished so that we
// can deliver the batch of data we let Rust do that for us.  When all I/O's are
// done then the MutableBatch will go out of scope and we know we have all the
// data.
impl<F: FnOnce(Result<Vec<Bytes>>) + Send> Drop for MutableBatch<F> {
    fn drop(&mut self) {
        // If we have an error, return that.  Otherwise return the data
        let result = if self.err.is_some() {
            Err(Error::Wrapped {
                error: self.err.take().unwrap(),
                location: location!(),
            })
        } else {
            let mut data = Vec::new();
            std::mem::swap(&mut data, &mut self.data_buffers);
            Ok(data)
        };
        // We don't really care if no one is around to receive it, just let
        // the result go out of scope and get cleaned up
        (self.when_done.take().unwrap())(result);
    }
}

trait DataSink: Send {
    fn deliver_data(&mut self, data: Result<(usize, Bytes)>);
}

impl<F: FnOnce(Result<Vec<Bytes>>) + Send> DataSink for MutableBatch<F> {
    // Called by worker tasks to add data to the MutableBatch
    fn deliver_data(&mut self, data: Result<(usize, Bytes)>) {
        match data {
            Ok(data) => {
                self.data_buffers[data.0] = data.1;
            }
            Err(err) => {
                // This keeps the original error, if present
                self.err.get_or_insert(Box::new(err));
            }
        }
    }
}

struct IoTask {
    reader: Arc<dyn Reader>,
    to_read: Range<u64>,
    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
}

impl IoTask {
    async fn run(self) {
        let bytes = self
            .reader
            .get_range(self.to_read.start as usize..self.to_read.end as usize)
            .await;
        (self.when_done)(bytes);
    }
}

// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
// repeats endlessly until the scheduler is destroyed.
async fn run_io_loop(tasks: mpsc::UnboundedReceiver<IoTask>, io_capacity: u32) {
    let io_stream = UnboundedReceiverStream::new(tasks);
    let tokio_task_stream = io_stream.map(|task| tokio::spawn(task.run()));
    let mut tokio_task_stream = tokio_task_stream.buffer_unordered(io_capacity as usize);
    while tokio_task_stream.next().await.is_some() {
        // We don't actually do anything with the results here, they are sent
        // via the io tasks's when_done.  Instead we just keep chugging away
        // indefinitely until the tasks receiver returns none (scheduler has
        // been shut down)
    }
}

/// An I/O scheduler which wraps an ObjectStore and throttles the amount of\
/// parallel I/O that can be run.
///
/// TODO: This will also add coalescing
pub struct StoreScheduler {
    object_store: Arc<ObjectStore>,
    io_submitter: mpsc::UnboundedSender<IoTask>,
}

impl StoreScheduler {
    /// Create a new scheduler with the given I/O capacity
    ///
    /// # Arguments
    ///
    /// * object_store - the store to wrap
    /// * io_capacity - the maximum number of parallel requests that will be allowed
    pub fn new(object_store: Arc<ObjectStore>, io_capacity: u32) -> Arc<Self> {
        // TODO: we don't have any backpressure in place if the compute thread falls
        // behind.  The scheduler thread will schedule ALL of the I/O and then the
        // loaded data will eventually pile up.
        //
        // We could bound this channel but that wouldn't help.  If the decode thread
        // was paused then the I/O loop would keep running and reading from this channel.
        //
        // Once the reader is finished we should revisit.  We will probably want to convert
        // from `when_done` futures to delivering data into a queue.  That queue should fill
        // up, causing the I/O loop to pause.
        let (reg_tx, reg_rx) = mpsc::unbounded_channel();
        let scheduler = Self {
            object_store,
            io_submitter: reg_tx,
        };
        tokio::task::spawn(async move { run_io_loop(reg_rx, io_capacity).await });
        Arc::new(scheduler)
    }

    /// Open a file for reading
    pub async fn open_file(self: &Arc<Self>, path: &Path) -> Result<FileScheduler> {
        let reader = self.object_store.open(path).await?;
        Ok(FileScheduler {
            reader: reader.into(),
            root: self.clone(),
        })
    }

    fn do_submit_request(
        &self,
        reader: Arc<dyn Reader>,
        request: Vec<Range<u64>>,
        tx: oneshot::Sender<Result<Vec<Bytes>>>,
    ) {
        let num_iops = request.len() as u32;

        let when_all_io_done = move |bytes| {
            // We don't care if the receiver has given up
            let _ = tx.send(bytes);
        };

        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
            when_all_io_done,
            num_iops,
        ))));

        for (task_idx, iop) in request.into_iter().enumerate() {
            let dest = dest.clone();
            let task = IoTask {
                reader: reader.clone(),
                to_read: iop,
                when_done: Box::new(move |bytes| {
                    let mut dest = dest.lock().unwrap();
                    dest.deliver_data(bytes.map(|bytes| (task_idx, bytes)));
                }),
            };
            if self.io_submitter.send(task).is_err() {
                panic!("unable to submit I/O because the I/O thread has panic'd");
            }
        }
    }

    fn submit_request(
        &self,
        reader: Arc<dyn Reader>,
        request: Vec<Range<u64>>,
    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
        let (tx, rx) = oneshot::channel::<Result<Vec<Bytes>>>();

        self.do_submit_request(reader, request, tx);

        // Right now, it isn't possible for I/O to be cancelled so a cancel error should
        // not occur
        rx.map(|wrapped_err| wrapped_err.unwrap())
    }
}

/// A throttled file reader
#[derive(Clone)]
pub struct FileScheduler {
    reader: Arc<dyn Reader>,
    root: Arc<StoreScheduler>,
}

impl FileScheduler {
    /// Submit a batch of I/O requests to the reader
    ///
    /// The requests will be queued in a FIFO manner and, when all requests
    /// have been fulfilled, the returned future will be completed.
    pub fn submit_request(
        &self,
        request: Vec<Range<u64>>,
    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
        self.root.submit_request(self.reader.clone(), request)
    }

    /// Submit a single IOP to the reader
    ///
    /// If you have multpile IOPS to perform then [`Self::submit_request`] is going
    /// to be more efficient.
    pub fn submit_single(&self, range: Range<u64>) -> impl Future<Output = Result<Bytes>> + Send {
        self.submit_request(vec![range])
            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
    }

    /// Provides access to the underlying reader
    ///
    /// Do not use this for reading data as it will bypass any I/O scheduling!
    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
    /// which either aren't IOPS or we don't throttle
    pub fn reader(&self) -> &Arc<dyn Reader> {
        &self.reader
    }
}

#[cfg(test)]
mod tests {
    use std::collections::VecDeque;

    use rand::RngCore;
    use tempfile::tempdir;

    use super::*;

    #[tokio::test]
    async fn test_full_seq_read() {
        let tmpdir = tempdir().unwrap();
        let tmp_path = tmpdir.path().to_str().unwrap();
        let tmp_path = Path::parse(tmp_path).unwrap();
        let tmp_file = tmp_path.child("foo.file");

        let obj_store = Arc::new(ObjectStore::local());

        // Write 1MiB of data
        const DATA_SIZE: u64 = 1024 * 1024;
        let mut some_data = vec![0; DATA_SIZE as usize];
        rand::thread_rng().fill_bytes(&mut some_data);
        obj_store.put(&tmp_file, &some_data).await.unwrap();

        let scheduler = StoreScheduler::new(obj_store, 16);

        let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap();

        // Read it back 4KiB at a time
        const READ_SIZE: u64 = 4 * 1024;
        let mut reqs = VecDeque::new();
        let mut offset = 0;
        while offset < DATA_SIZE {
            reqs.push_back(
                #[allow(clippy::single_range_in_vec_init)]
                file_scheduler
                    .submit_request(vec![offset..offset + READ_SIZE])
                    .await
                    .unwrap(),
            );
            offset += READ_SIZE;
        }

        offset = 0;
        // Note: we should get parallel I/O even though we are consuming serially
        while offset < DATA_SIZE {
            let data = reqs.pop_front().unwrap();
            let actual = &data[0];
            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
            assert_eq!(expected, actual);
            offset += READ_SIZE;
        }
    }
}