Skip to main content

lance_io/utils/
tracking_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Make assertions about IO operations to an [ObjectStore].
5//!
6//! When testing code that performs IO, you will often want to make assertions
7//! about the number of reads and writes performed, the amount of data read or
8//! written, and the number of disjoint periods where at least one IO is in-flight.
9//!
10//! This modules provides [`IOTracker`] which can be used to wrap any object store.
11use std::fmt::{Display, Formatter};
12use std::ops::Range;
13#[cfg(feature = "test-util")]
14use std::sync::atomic::AtomicU16;
15use std::sync::{Arc, Mutex};
16
17use bytes::Bytes;
18use futures::StreamExt;
19use futures::TryStreamExt;
20use futures::stream::BoxStream;
21use object_store::path::Path;
22use object_store::{
23    CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
24    ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions,
25    Result as OSResult, UploadPart,
26};
27
28use crate::object_store::WrappingObjectStore;
29
30#[derive(Debug, Default, Clone)]
31pub struct IOTracker(Arc<Mutex<IoStats>>);
32
33impl IOTracker {
34    /// Get IO statistics and reset the counters (incremental pattern).
35    ///
36    /// This returns the accumulated statistics since the last call and resets
37    /// the internal counters to zero.
38    pub fn incremental_stats(&self) -> IoStats {
39        std::mem::take(&mut *self.0.lock().unwrap())
40    }
41
42    /// Get a snapshot of current IO statistics without resetting counters.
43    ///
44    /// This returns a clone of the current statistics without modifying the
45    /// internal state. Use this when you need to check stats without resetting.
46    pub fn stats(&self) -> IoStats {
47        self.0.lock().unwrap().clone()
48    }
49
50    /// Record a read operation for tracking.
51    ///
52    /// This is used by readers that bypass the ObjectStore layer (like LocalObjectReader)
53    /// to ensure their IO operations are still tracked.
54    pub fn record_read(
55        &self,
56        #[allow(unused_variables)] method: &'static str,
57        #[allow(unused_variables)] path: Path,
58        num_bytes: u64,
59        #[allow(unused_variables)] range: Option<Range<u64>>,
60    ) {
61        let mut stats = self.0.lock().unwrap();
62        stats.read_iops += 1;
63        stats.read_bytes += num_bytes;
64        #[cfg(feature = "test-util")]
65        stats.requests.push(IoRequestRecord {
66            method,
67            path,
68            range,
69        });
70    }
71
72    /// Record a write operation for tracking.
73    ///
74    /// This is used by writers that bypass the ObjectStore layer (like LocalWriter)
75    /// to ensure their IO operations are still tracked.
76    pub fn record_write(
77        &self,
78        #[allow(unused_variables)] method: &'static str,
79        #[allow(unused_variables)] path: Path,
80        num_bytes: u64,
81    ) {
82        let mut stats = self.0.lock().unwrap();
83        stats.write_iops += 1;
84        stats.written_bytes += num_bytes;
85        #[cfg(feature = "test-util")]
86        stats.requests.push(IoRequestRecord {
87            method,
88            path,
89            range: None,
90        });
91    }
92}
93
94impl WrappingObjectStore for IOTracker {
95    fn wrap(&self, _store_prefix: &str, target: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
96        Arc::new(IoTrackingStore::new(target, self.0.clone()))
97    }
98}
99
100#[derive(Debug, Default, Clone)]
101pub struct IoStats {
102    pub read_iops: u64,
103    pub read_bytes: u64,
104    pub write_iops: u64,
105    pub written_bytes: u64,
106    // This is only really meaningful in tests where there isn't any concurrent IO.
107    #[cfg(feature = "test-util")]
108    /// Number of disjoint periods where at least one IO is in-flight.
109    pub num_stages: u64,
110    #[cfg(feature = "test-util")]
111    pub requests: Vec<IoRequestRecord>,
112}
113
114/// Assertions on IO statistics.
115/// assert_io_eq!(io_stats, read_iops, 1);
116/// assert_io_eq!(io_stats, write_iops, 0, "should be no writes");
117/// assert_io_eq!(io_stats, num_hops, 1, "should be just {}", "one hop");
118#[cfg(feature = "test-util")]
119#[macro_export]
120macro_rules! assert_io_eq {
121    ($io_stats:expr, $field:ident, $expected:expr) => {
122        assert_eq!(
123            $io_stats.$field, $expected,
124            "Expected {} to be {}, got {}. Requests: {:#?}",
125            stringify!($field),
126            $expected,
127            $io_stats.$field,
128            $io_stats.requests
129        );
130    };
131    ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
132        assert_eq!(
133            $io_stats.$field, $expected,
134            "Expected {} to be {}, got {}. Requests: {:#?} {}",
135            stringify!($field),
136            $expected,
137            $io_stats.$field,
138            $io_stats.requests,
139            format_args!($($arg)+)
140        );
141    };
142}
143
144#[cfg(feature = "test-util")]
145#[macro_export]
146macro_rules! assert_io_gt {
147    ($io_stats:expr, $field:ident, $expected:expr) => {
148        assert!(
149            $io_stats.$field > $expected,
150            "Expected {} to be > {}, got {}. Requests: {:#?}",
151            stringify!($field),
152            $expected,
153            $io_stats.$field,
154            $io_stats.requests
155        );
156    };
157    ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
158        assert!(
159            $io_stats.$field > $expected,
160            "Expected {} to be > {}, got {}. Requests: {:#?} {}",
161            stringify!($field),
162            $expected,
163            $io_stats.$field,
164            $io_stats.requests,
165            format_args!($($arg)+)
166        );
167    };
168}
169
170#[cfg(feature = "test-util")]
171#[macro_export]
172macro_rules! assert_io_lt {
173    ($io_stats:expr, $field:ident, $expected:expr) => {
174        assert!(
175            $io_stats.$field < $expected,
176            "Expected {} to be < {}, got {}. Requests: {:#?}",
177            stringify!($field),
178            $expected,
179            $io_stats.$field,
180            $io_stats.requests
181        );
182    };
183    ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
184        assert!(
185            $io_stats.$field < $expected,
186            "Expected {} to be < {}, got {}. Requests: {:#?} {}",
187            stringify!($field),
188            $expected,
189            $io_stats.$field,
190            $io_stats.requests,
191            format_args!($($arg)+)
192        );
193    };
194}
195
196// These request records only exist for test-only diagnostics.
197#[cfg(feature = "test-util")]
198#[derive(Clone)]
199pub struct IoRequestRecord {
200    pub method: &'static str,
201    pub path: Path,
202    pub range: Option<Range<u64>>,
203}
204
205#[cfg(feature = "test-util")]
206impl std::fmt::Debug for IoRequestRecord {
207    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208        // For example: "put /path/to/file range: 0-100"
209        write!(
210            f,
211            "IORequest(method={}, path=\"{}\"",
212            self.method, self.path
213        )?;
214        if let Some(range) = &self.range {
215            write!(f, ", range={:?}", range)?;
216        }
217        write!(f, ")")?;
218        Ok(())
219    }
220}
221
222impl Display for IoStats {
223    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224        write!(f, "{:#?}", self)
225    }
226}
227
228#[derive(Debug)]
229pub struct IoTrackingStore {
230    target: Arc<dyn ObjectStore>,
231    stats: Arc<Mutex<IoStats>>,
232    #[cfg(feature = "test-util")]
233    active_requests: Arc<AtomicU16>,
234}
235
236impl Display for IoTrackingStore {
237    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
238        write!(f, "{:#?}", self)
239    }
240}
241
242impl IoTrackingStore {
243    pub fn new(target: Arc<dyn ObjectStore>, stats: Arc<Mutex<IoStats>>) -> Self {
244        Self {
245            target,
246            stats,
247            #[cfg(feature = "test-util")]
248            active_requests: Arc::new(AtomicU16::new(0)),
249        }
250    }
251
252    fn record_read(
253        &self,
254        method: &'static str,
255        path: Path,
256        num_bytes: u64,
257        range: Option<Range<u64>>,
258    ) {
259        let mut stats = self.stats.lock().unwrap();
260        stats.read_iops += 1;
261        stats.read_bytes += num_bytes;
262        #[cfg(feature = "test-util")]
263        stats.requests.push(IoRequestRecord {
264            method,
265            path,
266            range,
267        });
268        #[cfg(not(feature = "test-util"))]
269        let _ = (method, path, range); // Suppress unused variable warnings
270    }
271
272    fn record_write(&self, method: &'static str, path: Path, num_bytes: u64) {
273        let mut stats = self.stats.lock().unwrap();
274        stats.write_iops += 1;
275        stats.written_bytes += num_bytes;
276        #[cfg(feature = "test-util")]
277        stats.requests.push(IoRequestRecord {
278            method,
279            path,
280            range: None,
281        });
282        #[cfg(not(feature = "test-util"))]
283        let _ = (method, path); // Suppress unused variable warnings
284    }
285
286    #[cfg(feature = "test-util")]
287    fn stage_guard(&self) -> StageGuard {
288        StageGuard::new(self.active_requests.clone(), self.stats.clone())
289    }
290
291    #[cfg(not(feature = "test-util"))]
292    fn stage_guard(&self) -> StageGuard {
293        StageGuard
294    }
295}
296
297#[async_trait::async_trait]
298#[deny(clippy::missing_trait_methods)]
299impl ObjectStore for IoTrackingStore {
300    async fn put_opts(
301        &self,
302        location: &Path,
303        bytes: PutPayload,
304        opts: PutOptions,
305    ) -> OSResult<PutResult> {
306        let _guard = self.stage_guard();
307        self.record_write(
308            "put_opts",
309            location.to_owned(),
310            bytes.content_length() as u64,
311        );
312        self.target.put_opts(location, bytes, opts).await
313    }
314
315    async fn put_multipart_opts(
316        &self,
317        location: &Path,
318        opts: PutMultipartOptions,
319    ) -> OSResult<Box<dyn MultipartUpload>> {
320        let _guard = self.stage_guard();
321        let target = self.target.put_multipart_opts(location, opts).await?;
322        Ok(Box::new(IoTrackingMultipartUpload {
323            target,
324            stats: self.stats.clone(),
325            #[cfg(feature = "test-util")]
326            path: location.to_owned(),
327            #[cfg(feature = "test-util")]
328            _guard,
329        }))
330    }
331
332    async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
333        let _guard = self.stage_guard();
334        let range = match &options.range {
335            Some(GetRange::Bounded(range)) => Some(range.clone()),
336            _ => None, // TODO: fill in other options.
337        };
338        let result = self.target.get_opts(location, options).await;
339        if let Ok(result) = &result {
340            let num_bytes = result.range.end - result.range.start;
341
342            self.record_read("get_opts", location.to_owned(), num_bytes, range);
343        }
344        result
345    }
346
347    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
348        let _guard = self.stage_guard();
349        let result = self.target.get_ranges(location, ranges).await;
350        if let Ok(result) = &result {
351            self.record_read(
352                "get_ranges",
353                location.to_owned(),
354                result.iter().map(|b| b.len() as u64).sum(),
355                None,
356            );
357        }
358        result
359    }
360
361    fn delete_stream(
362        &self,
363        locations: BoxStream<'static, OSResult<Path>>,
364    ) -> BoxStream<'static, OSResult<Path>> {
365        let stats = Arc::clone(&self.stats);
366        let tracked = locations
367            .map_ok(move |path| {
368                let mut stats = stats.lock().unwrap();
369                stats.write_iops += 1;
370                #[cfg(feature = "test-util")]
371                stats.requests.push(IoRequestRecord {
372                    method: "delete",
373                    path: path.clone(),
374                    range: None,
375                });
376                path
377            })
378            .boxed();
379        self.target.delete_stream(tracked)
380    }
381
382    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
383        let _guard = self.stage_guard();
384        self.record_read("list", prefix.cloned().unwrap_or_default(), 0, None);
385        self.target.list(prefix)
386    }
387
388    fn list_with_offset(
389        &self,
390        prefix: Option<&Path>,
391        offset: &Path,
392    ) -> BoxStream<'static, OSResult<ObjectMeta>> {
393        self.record_read(
394            "list_with_offset",
395            prefix.cloned().unwrap_or_default(),
396            0,
397            None,
398        );
399        self.target.list_with_offset(prefix, offset)
400    }
401
402    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
403        let _guard = self.stage_guard();
404        self.record_read(
405            "list_with_delimiter",
406            prefix.cloned().unwrap_or_default(),
407            0,
408            None,
409        );
410        self.target.list_with_delimiter(prefix).await
411    }
412
413    async fn copy_opts(&self, from: &Path, to: &Path, opts: CopyOptions) -> OSResult<()> {
414        let _guard = self.stage_guard();
415        self.record_write("copy", from.to_owned(), 0);
416        self.target.copy_opts(from, to, opts).await
417    }
418
419    async fn rename_opts(&self, from: &Path, to: &Path, opts: RenameOptions) -> OSResult<()> {
420        let _guard = self.stage_guard();
421        self.record_write("rename", from.to_owned(), 0);
422        self.target.rename_opts(from, to, opts).await
423    }
424}
425
426#[derive(Debug)]
427struct IoTrackingMultipartUpload {
428    target: Box<dyn MultipartUpload>,
429    #[cfg(feature = "test-util")]
430    path: Path,
431    stats: Arc<Mutex<IoStats>>,
432    #[cfg(feature = "test-util")]
433    _guard: StageGuard,
434}
435
436#[async_trait::async_trait]
437impl MultipartUpload for IoTrackingMultipartUpload {
438    async fn abort(&mut self) -> OSResult<()> {
439        self.target.abort().await
440    }
441
442    async fn complete(&mut self) -> OSResult<PutResult> {
443        self.target.complete().await
444    }
445
446    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
447        {
448            let mut stats = self.stats.lock().unwrap();
449            stats.write_iops += 1;
450            stats.written_bytes += payload.content_length() as u64;
451            #[cfg(feature = "test-util")]
452            stats.requests.push(IoRequestRecord {
453                method: "put_part",
454                path: self.path.to_owned(),
455                range: None,
456            });
457        }
458        self.target.put_part(payload)
459    }
460}
461
462#[cfg(feature = "test-util")]
463#[derive(Debug)]
464struct StageGuard {
465    active_requests: Arc<AtomicU16>,
466    stats: Arc<Mutex<IoStats>>,
467}
468
469#[cfg(not(feature = "test-util"))]
470struct StageGuard;
471
472#[cfg(feature = "test-util")]
473impl StageGuard {
474    fn new(active_requests: Arc<AtomicU16>, stats: Arc<Mutex<IoStats>>) -> Self {
475        active_requests.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
476        Self {
477            active_requests,
478            stats,
479        }
480    }
481}
482
483#[cfg(feature = "test-util")]
484impl Drop for StageGuard {
485    fn drop(&mut self) {
486        if self
487            .active_requests
488            .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
489            == 1
490        {
491            let mut stats = self.stats.lock().unwrap();
492            stats.num_stages += 1;
493        }
494    }
495}