Skip to main content

lance_io/utils/
tracking_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Make assertions about IO operations to an [ObjectStore].
5//!
6//! When testing code that performs IO, you will often want to make assertions
7//! about the number of reads and writes performed, the amount of data read or
8//! written, and the number of disjoint periods where at least one IO is in-flight.
9//!
10//! This modules provides [`IOTracker`] which can be used to wrap any object store.
11use std::fmt::{Display, Formatter};
12use std::ops::Range;
13#[cfg(feature = "test-util")]
14use std::sync::atomic::AtomicU16;
15use std::sync::{Arc, Mutex};
16
17use bytes::Bytes;
18use futures::stream::BoxStream;
19use object_store::path::Path;
20use object_store::{
21    GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
22    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart,
23};
24
25use crate::object_store::WrappingObjectStore;
26
27#[derive(Debug, Default, Clone)]
28pub struct IOTracker(Arc<Mutex<IoStats>>);
29
30impl IOTracker {
31    /// Get IO statistics and reset the counters (incremental pattern).
32    ///
33    /// This returns the accumulated statistics since the last call and resets
34    /// the internal counters to zero.
35    pub fn incremental_stats(&self) -> IoStats {
36        std::mem::take(&mut *self.0.lock().unwrap())
37    }
38
39    /// Get a snapshot of current IO statistics without resetting counters.
40    ///
41    /// This returns a clone of the current statistics without modifying the
42    /// internal state. Use this when you need to check stats without resetting.
43    pub fn stats(&self) -> IoStats {
44        self.0.lock().unwrap().clone()
45    }
46
47    /// Record a read operation for tracking.
48    ///
49    /// This is used by readers that bypass the ObjectStore layer (like LocalObjectReader)
50    /// to ensure their IO operations are still tracked.
51    pub fn record_read(
52        &self,
53        #[allow(unused_variables)] method: &'static str,
54        #[allow(unused_variables)] path: Path,
55        num_bytes: u64,
56        #[allow(unused_variables)] range: Option<Range<u64>>,
57    ) {
58        let mut stats = self.0.lock().unwrap();
59        stats.read_iops += 1;
60        stats.read_bytes += num_bytes;
61        #[cfg(feature = "test-util")]
62        stats.requests.push(IoRequestRecord {
63            method,
64            path,
65            range,
66        });
67    }
68
69    /// Record a write operation for tracking.
70    ///
71    /// This is used by writers that bypass the ObjectStore layer (like LocalWriter)
72    /// to ensure their IO operations are still tracked.
73    pub fn record_write(
74        &self,
75        #[allow(unused_variables)] method: &'static str,
76        #[allow(unused_variables)] path: Path,
77        num_bytes: u64,
78    ) {
79        let mut stats = self.0.lock().unwrap();
80        stats.write_iops += 1;
81        stats.written_bytes += num_bytes;
82        #[cfg(feature = "test-util")]
83        stats.requests.push(IoRequestRecord {
84            method,
85            path,
86            range: None,
87        });
88    }
89}
90
91impl WrappingObjectStore for IOTracker {
92    fn wrap(&self, _store_prefix: &str, target: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
93        Arc::new(IoTrackingStore::new(target, self.0.clone()))
94    }
95}
96
97#[derive(Debug, Default, Clone)]
98pub struct IoStats {
99    pub read_iops: u64,
100    pub read_bytes: u64,
101    pub write_iops: u64,
102    pub written_bytes: u64,
103    // This is only really meaningful in tests where there isn't any concurrent IO.
104    #[cfg(feature = "test-util")]
105    /// Number of disjoint periods where at least one IO is in-flight.
106    pub num_stages: u64,
107    #[cfg(feature = "test-util")]
108    pub requests: Vec<IoRequestRecord>,
109}
110
111/// Assertions on IO statistics.
112/// assert_io_eq!(io_stats, read_iops, 1);
113/// assert_io_eq!(io_stats, write_iops, 0, "should be no writes");
114/// assert_io_eq!(io_stats, num_hops, 1, "should be just {}", "one hop");
115#[cfg(feature = "test-util")]
116#[macro_export]
117macro_rules! assert_io_eq {
118    ($io_stats:expr, $field:ident, $expected:expr) => {
119        assert_eq!(
120            $io_stats.$field, $expected,
121            "Expected {} to be {}, got {}. Requests: {:#?}",
122            stringify!($field),
123            $expected,
124            $io_stats.$field,
125            $io_stats.requests
126        );
127    };
128    ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
129        assert_eq!(
130            $io_stats.$field, $expected,
131            "Expected {} to be {}, got {}. Requests: {:#?} {}",
132            stringify!($field),
133            $expected,
134            $io_stats.$field,
135            $io_stats.requests,
136            format_args!($($arg)+)
137        );
138    };
139}
140
141#[cfg(feature = "test-util")]
142#[macro_export]
143macro_rules! assert_io_gt {
144    ($io_stats:expr, $field:ident, $expected:expr) => {
145        assert!(
146            $io_stats.$field > $expected,
147            "Expected {} to be > {}, got {}. Requests: {:#?}",
148            stringify!($field),
149            $expected,
150            $io_stats.$field,
151            $io_stats.requests
152        );
153    };
154    ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
155        assert!(
156            $io_stats.$field > $expected,
157            "Expected {} to be > {}, got {}. Requests: {:#?} {}",
158            stringify!($field),
159            $expected,
160            $io_stats.$field,
161            $io_stats.requests,
162            format_args!($($arg)+)
163        );
164    };
165}
166
167#[cfg(feature = "test-util")]
168#[macro_export]
169macro_rules! assert_io_lt {
170    ($io_stats:expr, $field:ident, $expected:expr) => {
171        assert!(
172            $io_stats.$field < $expected,
173            "Expected {} to be < {}, got {}. Requests: {:#?}",
174            stringify!($field),
175            $expected,
176            $io_stats.$field,
177            $io_stats.requests
178        );
179    };
180    ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
181        assert!(
182            $io_stats.$field < $expected,
183            "Expected {} to be < {}, got {}. Requests: {:#?} {}",
184            stringify!($field),
185            $expected,
186            $io_stats.$field,
187            $io_stats.requests,
188            format_args!($($arg)+)
189        );
190    };
191}
192
193// These fields are "dead code" because we just use them right now to display
194// in test failure messages through Debug. (The lint ignores Debug impls.)
195#[allow(dead_code)]
196#[derive(Clone)]
197pub struct IoRequestRecord {
198    pub method: &'static str,
199    pub path: Path,
200    pub range: Option<Range<u64>>,
201}
202
203impl std::fmt::Debug for IoRequestRecord {
204    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
205        // For example: "put /path/to/file range: 0-100"
206        write!(
207            f,
208            "IORequest(method={}, path=\"{}\"",
209            self.method, self.path
210        )?;
211        if let Some(range) = &self.range {
212            write!(f, ", range={:?}", range)?;
213        }
214        write!(f, ")")?;
215        Ok(())
216    }
217}
218
219impl Display for IoStats {
220    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
221        write!(f, "{:#?}", self)
222    }
223}
224
225#[derive(Debug)]
226pub struct IoTrackingStore {
227    target: Arc<dyn ObjectStore>,
228    stats: Arc<Mutex<IoStats>>,
229    #[cfg(feature = "test-util")]
230    active_requests: Arc<AtomicU16>,
231}
232
233impl Display for IoTrackingStore {
234    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
235        write!(f, "{:#?}", self)
236    }
237}
238
239impl IoTrackingStore {
240    pub fn new(target: Arc<dyn ObjectStore>, stats: Arc<Mutex<IoStats>>) -> Self {
241        Self {
242            target,
243            stats,
244            #[cfg(feature = "test-util")]
245            active_requests: Arc::new(AtomicU16::new(0)),
246        }
247    }
248
249    fn record_read(
250        &self,
251        method: &'static str,
252        path: Path,
253        num_bytes: u64,
254        range: Option<Range<u64>>,
255    ) {
256        let mut stats = self.stats.lock().unwrap();
257        stats.read_iops += 1;
258        stats.read_bytes += num_bytes;
259        #[cfg(feature = "test-util")]
260        stats.requests.push(IoRequestRecord {
261            method,
262            path,
263            range,
264        });
265        #[cfg(not(feature = "test-util"))]
266        let _ = (method, path, range); // Suppress unused variable warnings
267    }
268
269    fn record_write(&self, method: &'static str, path: Path, num_bytes: u64) {
270        let mut stats = self.stats.lock().unwrap();
271        stats.write_iops += 1;
272        stats.written_bytes += num_bytes;
273        #[cfg(feature = "test-util")]
274        stats.requests.push(IoRequestRecord {
275            method,
276            path,
277            range: None,
278        });
279        #[cfg(not(feature = "test-util"))]
280        let _ = (method, path); // Suppress unused variable warnings
281    }
282
283    #[cfg(feature = "test-util")]
284    fn stage_guard(&self) -> StageGuard {
285        StageGuard::new(self.active_requests.clone(), self.stats.clone())
286    }
287
288    #[cfg(not(feature = "test-util"))]
289    fn stage_guard(&self) -> StageGuard {
290        StageGuard
291    }
292}
293
294#[async_trait::async_trait]
295#[deny(clippy::missing_trait_methods)]
296impl ObjectStore for IoTrackingStore {
297    async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult<PutResult> {
298        let _guard = self.stage_guard();
299        self.record_write("put", location.to_owned(), bytes.content_length() as u64);
300        self.target.put(location, bytes).await
301    }
302
303    async fn put_opts(
304        &self,
305        location: &Path,
306        bytes: PutPayload,
307        opts: PutOptions,
308    ) -> OSResult<PutResult> {
309        let _guard = self.stage_guard();
310        self.record_write(
311            "put_opts",
312            location.to_owned(),
313            bytes.content_length() as u64,
314        );
315        self.target.put_opts(location, bytes, opts).await
316    }
317
318    async fn put_multipart(&self, location: &Path) -> OSResult<Box<dyn MultipartUpload>> {
319        let _guard = self.stage_guard();
320        let target = self.target.put_multipart(location).await?;
321        Ok(Box::new(IoTrackingMultipartUpload {
322            target,
323            stats: self.stats.clone(),
324            #[cfg(feature = "test-util")]
325            path: location.to_owned(),
326            #[cfg(feature = "test-util")]
327            _guard,
328        }))
329    }
330
331    async fn put_multipart_opts(
332        &self,
333        location: &Path,
334        opts: PutMultipartOptions,
335    ) -> OSResult<Box<dyn MultipartUpload>> {
336        let _guard = self.stage_guard();
337        let target = self.target.put_multipart_opts(location, opts).await?;
338        Ok(Box::new(IoTrackingMultipartUpload {
339            target,
340            stats: self.stats.clone(),
341            #[cfg(feature = "test-util")]
342            path: location.to_owned(),
343            #[cfg(feature = "test-util")]
344            _guard,
345        }))
346    }
347
348    async fn get(&self, location: &Path) -> OSResult<GetResult> {
349        let _guard = self.stage_guard();
350        let result = self.target.get(location).await;
351        if let Ok(result) = &result {
352            let num_bytes = result.range.end - result.range.start;
353            self.record_read("get", location.to_owned(), num_bytes, None);
354        }
355        result
356    }
357
358    async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
359        let _guard = self.stage_guard();
360        let range = match &options.range {
361            Some(GetRange::Bounded(range)) => Some(range.clone()),
362            _ => None, // TODO: fill in other options.
363        };
364        let result = self.target.get_opts(location, options).await;
365        if let Ok(result) = &result {
366            let num_bytes = result.range.end - result.range.start;
367
368            self.record_read("get_opts", location.to_owned(), num_bytes, range);
369        }
370        result
371    }
372
373    async fn get_range(&self, location: &Path, range: Range<u64>) -> OSResult<Bytes> {
374        let _guard = self.stage_guard();
375        let result = self.target.get_range(location, range.clone()).await;
376        if let Ok(result) = &result {
377            self.record_read(
378                "get_range",
379                location.to_owned(),
380                result.len() as u64,
381                Some(range),
382            );
383        }
384        result
385    }
386
387    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
388        let _guard = self.stage_guard();
389        let result = self.target.get_ranges(location, ranges).await;
390        if let Ok(result) = &result {
391            self.record_read(
392                "get_ranges",
393                location.to_owned(),
394                result.iter().map(|b| b.len() as u64).sum(),
395                None,
396            );
397        }
398        result
399    }
400
401    async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
402        let _guard = self.stage_guard();
403        self.record_read("head", location.to_owned(), 0, None);
404        self.target.head(location).await
405    }
406
407    async fn delete(&self, location: &Path) -> OSResult<()> {
408        let _guard = self.stage_guard();
409        self.record_write("delete", location.to_owned(), 0);
410        self.target.delete(location).await
411    }
412
413    fn delete_stream<'a>(
414        &'a self,
415        locations: BoxStream<'a, OSResult<Path>>,
416    ) -> BoxStream<'a, OSResult<Path>> {
417        self.target.delete_stream(locations)
418    }
419
420    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
421        let _guard = self.stage_guard();
422        self.record_read("list", prefix.cloned().unwrap_or_default(), 0, None);
423        self.target.list(prefix)
424    }
425
426    fn list_with_offset(
427        &self,
428        prefix: Option<&Path>,
429        offset: &Path,
430    ) -> BoxStream<'static, OSResult<ObjectMeta>> {
431        self.record_read(
432            "list_with_offset",
433            prefix.cloned().unwrap_or_default(),
434            0,
435            None,
436        );
437        self.target.list_with_offset(prefix, offset)
438    }
439
440    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
441        let _guard = self.stage_guard();
442        self.record_read(
443            "list_with_delimiter",
444            prefix.cloned().unwrap_or_default(),
445            0,
446            None,
447        );
448        self.target.list_with_delimiter(prefix).await
449    }
450
451    async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
452        let _guard = self.stage_guard();
453        self.record_write("copy", from.to_owned(), 0);
454        self.target.copy(from, to).await
455    }
456
457    async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
458        let _guard = self.stage_guard();
459        self.record_write("rename", from.to_owned(), 0);
460        self.target.rename(from, to).await
461    }
462
463    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
464        let _guard = self.stage_guard();
465        self.record_write("rename_if_not_exists", from.to_owned(), 0);
466        self.target.rename_if_not_exists(from, to).await
467    }
468
469    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
470        let _guard = self.stage_guard();
471        self.record_write("copy_if_not_exists", from.to_owned(), 0);
472        self.target.copy_if_not_exists(from, to).await
473    }
474}
475
476#[derive(Debug)]
477struct IoTrackingMultipartUpload {
478    target: Box<dyn MultipartUpload>,
479    #[cfg(feature = "test-util")]
480    path: Path,
481    stats: Arc<Mutex<IoStats>>,
482    #[cfg(feature = "test-util")]
483    _guard: StageGuard,
484}
485
486#[async_trait::async_trait]
487impl MultipartUpload for IoTrackingMultipartUpload {
488    async fn abort(&mut self) -> OSResult<()> {
489        self.target.abort().await
490    }
491
492    async fn complete(&mut self) -> OSResult<PutResult> {
493        self.target.complete().await
494    }
495
496    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
497        {
498            let mut stats = self.stats.lock().unwrap();
499            stats.write_iops += 1;
500            stats.written_bytes += payload.content_length() as u64;
501            #[cfg(feature = "test-util")]
502            stats.requests.push(IoRequestRecord {
503                method: "put_part",
504                path: self.path.to_owned(),
505                range: None,
506            });
507        }
508        self.target.put_part(payload)
509    }
510}
511
512#[cfg(feature = "test-util")]
513#[derive(Debug)]
514struct StageGuard {
515    active_requests: Arc<AtomicU16>,
516    stats: Arc<Mutex<IoStats>>,
517}
518
519#[cfg(not(feature = "test-util"))]
520struct StageGuard;
521
522#[cfg(feature = "test-util")]
523impl StageGuard {
524    fn new(active_requests: Arc<AtomicU16>, stats: Arc<Mutex<IoStats>>) -> Self {
525        active_requests.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
526        Self {
527            active_requests,
528            stats,
529        }
530    }
531}
532
533#[cfg(feature = "test-util")]
534impl Drop for StageGuard {
535    fn drop(&mut self) {
536        if self
537            .active_requests
538            .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
539            == 1
540        {
541            let mut stats = self.stats.lock().unwrap();
542            stats.num_stages += 1;
543        }
544    }
545}