1use std::fmt::{Display, Formatter};
12use std::ops::Range;
13#[cfg(feature = "test-util")]
14use std::sync::atomic::AtomicU16;
15use std::sync::{Arc, Mutex};
16
17use bytes::Bytes;
18use futures::StreamExt;
19use futures::TryStreamExt;
20use futures::stream::BoxStream;
21use object_store::path::Path;
22use object_store::{
23 CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
24 ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions,
25 Result as OSResult, UploadPart,
26};
27
28use crate::object_store::WrappingObjectStore;
29
30#[derive(Debug, Default, Clone)]
31pub struct IOTracker(Arc<Mutex<IoStats>>);
32
33impl IOTracker {
34 pub fn incremental_stats(&self) -> IoStats {
39 std::mem::take(&mut *self.0.lock().unwrap())
40 }
41
42 pub fn stats(&self) -> IoStats {
47 self.0.lock().unwrap().clone()
48 }
49
50 pub fn record_read(
55 &self,
56 #[allow(unused_variables)] method: &'static str,
57 #[allow(unused_variables)] path: Path,
58 num_bytes: u64,
59 #[allow(unused_variables)] range: Option<Range<u64>>,
60 ) {
61 let mut stats = self.0.lock().unwrap();
62 stats.read_iops += 1;
63 stats.read_bytes += num_bytes;
64 #[cfg(feature = "test-util")]
65 stats.requests.push(IoRequestRecord {
66 method,
67 path,
68 range,
69 });
70 }
71
72 pub fn record_write(
77 &self,
78 #[allow(unused_variables)] method: &'static str,
79 #[allow(unused_variables)] path: Path,
80 num_bytes: u64,
81 ) {
82 let mut stats = self.0.lock().unwrap();
83 stats.write_iops += 1;
84 stats.written_bytes += num_bytes;
85 #[cfg(feature = "test-util")]
86 stats.requests.push(IoRequestRecord {
87 method,
88 path,
89 range: None,
90 });
91 }
92}
93
94impl WrappingObjectStore for IOTracker {
95 fn wrap(&self, _store_prefix: &str, target: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
96 Arc::new(IoTrackingStore::new(target, self.0.clone()))
97 }
98}
99
100#[derive(Debug, Default, Clone)]
101pub struct IoStats {
102 pub read_iops: u64,
103 pub read_bytes: u64,
104 pub write_iops: u64,
105 pub written_bytes: u64,
106 #[cfg(feature = "test-util")]
108 pub num_stages: u64,
110 #[cfg(feature = "test-util")]
111 pub requests: Vec<IoRequestRecord>,
112}
113
114#[cfg(feature = "test-util")]
119#[macro_export]
120macro_rules! assert_io_eq {
121 ($io_stats:expr, $field:ident, $expected:expr) => {
122 assert_eq!(
123 $io_stats.$field, $expected,
124 "Expected {} to be {}, got {}. Requests: {:#?}",
125 stringify!($field),
126 $expected,
127 $io_stats.$field,
128 $io_stats.requests
129 );
130 };
131 ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
132 assert_eq!(
133 $io_stats.$field, $expected,
134 "Expected {} to be {}, got {}. Requests: {:#?} {}",
135 stringify!($field),
136 $expected,
137 $io_stats.$field,
138 $io_stats.requests,
139 format_args!($($arg)+)
140 );
141 };
142}
143
144#[cfg(feature = "test-util")]
145#[macro_export]
146macro_rules! assert_io_gt {
147 ($io_stats:expr, $field:ident, $expected:expr) => {
148 assert!(
149 $io_stats.$field > $expected,
150 "Expected {} to be > {}, got {}. Requests: {:#?}",
151 stringify!($field),
152 $expected,
153 $io_stats.$field,
154 $io_stats.requests
155 );
156 };
157 ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
158 assert!(
159 $io_stats.$field > $expected,
160 "Expected {} to be > {}, got {}. Requests: {:#?} {}",
161 stringify!($field),
162 $expected,
163 $io_stats.$field,
164 $io_stats.requests,
165 format_args!($($arg)+)
166 );
167 };
168}
169
170#[cfg(feature = "test-util")]
171#[macro_export]
172macro_rules! assert_io_lt {
173 ($io_stats:expr, $field:ident, $expected:expr) => {
174 assert!(
175 $io_stats.$field < $expected,
176 "Expected {} to be < {}, got {}. Requests: {:#?}",
177 stringify!($field),
178 $expected,
179 $io_stats.$field,
180 $io_stats.requests
181 );
182 };
183 ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
184 assert!(
185 $io_stats.$field < $expected,
186 "Expected {} to be < {}, got {}. Requests: {:#?} {}",
187 stringify!($field),
188 $expected,
189 $io_stats.$field,
190 $io_stats.requests,
191 format_args!($($arg)+)
192 );
193 };
194}
195
196#[cfg(feature = "test-util")]
198#[derive(Clone)]
199pub struct IoRequestRecord {
200 pub method: &'static str,
201 pub path: Path,
202 pub range: Option<Range<u64>>,
203}
204
205#[cfg(feature = "test-util")]
206impl std::fmt::Debug for IoRequestRecord {
207 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208 write!(
210 f,
211 "IORequest(method={}, path=\"{}\"",
212 self.method, self.path
213 )?;
214 if let Some(range) = &self.range {
215 write!(f, ", range={:?}", range)?;
216 }
217 write!(f, ")")?;
218 Ok(())
219 }
220}
221
222impl Display for IoStats {
223 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224 write!(f, "{:#?}", self)
225 }
226}
227
228#[derive(Debug)]
229pub struct IoTrackingStore {
230 target: Arc<dyn ObjectStore>,
231 stats: Arc<Mutex<IoStats>>,
232 #[cfg(feature = "test-util")]
233 active_requests: Arc<AtomicU16>,
234}
235
236impl Display for IoTrackingStore {
237 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
238 write!(f, "{:#?}", self)
239 }
240}
241
242impl IoTrackingStore {
243 pub fn new(target: Arc<dyn ObjectStore>, stats: Arc<Mutex<IoStats>>) -> Self {
244 Self {
245 target,
246 stats,
247 #[cfg(feature = "test-util")]
248 active_requests: Arc::new(AtomicU16::new(0)),
249 }
250 }
251
252 fn record_read(
253 &self,
254 method: &'static str,
255 path: Path,
256 num_bytes: u64,
257 range: Option<Range<u64>>,
258 ) {
259 let mut stats = self.stats.lock().unwrap();
260 stats.read_iops += 1;
261 stats.read_bytes += num_bytes;
262 #[cfg(feature = "test-util")]
263 stats.requests.push(IoRequestRecord {
264 method,
265 path,
266 range,
267 });
268 #[cfg(not(feature = "test-util"))]
269 let _ = (method, path, range); }
271
272 fn record_write(&self, method: &'static str, path: Path, num_bytes: u64) {
273 let mut stats = self.stats.lock().unwrap();
274 stats.write_iops += 1;
275 stats.written_bytes += num_bytes;
276 #[cfg(feature = "test-util")]
277 stats.requests.push(IoRequestRecord {
278 method,
279 path,
280 range: None,
281 });
282 #[cfg(not(feature = "test-util"))]
283 let _ = (method, path); }
285
286 #[cfg(feature = "test-util")]
287 fn stage_guard(&self) -> StageGuard {
288 StageGuard::new(self.active_requests.clone(), self.stats.clone())
289 }
290
291 #[cfg(not(feature = "test-util"))]
292 fn stage_guard(&self) -> StageGuard {
293 StageGuard
294 }
295}
296
297#[async_trait::async_trait]
298#[deny(clippy::missing_trait_methods)]
299impl ObjectStore for IoTrackingStore {
300 async fn put_opts(
301 &self,
302 location: &Path,
303 bytes: PutPayload,
304 opts: PutOptions,
305 ) -> OSResult<PutResult> {
306 let _guard = self.stage_guard();
307 self.record_write(
308 "put_opts",
309 location.to_owned(),
310 bytes.content_length() as u64,
311 );
312 self.target.put_opts(location, bytes, opts).await
313 }
314
315 async fn put_multipart_opts(
316 &self,
317 location: &Path,
318 opts: PutMultipartOptions,
319 ) -> OSResult<Box<dyn MultipartUpload>> {
320 let _guard = self.stage_guard();
321 let target = self.target.put_multipart_opts(location, opts).await?;
322 Ok(Box::new(IoTrackingMultipartUpload {
323 target,
324 stats: self.stats.clone(),
325 #[cfg(feature = "test-util")]
326 path: location.to_owned(),
327 #[cfg(feature = "test-util")]
328 _guard,
329 }))
330 }
331
332 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
333 let _guard = self.stage_guard();
334 let range = match &options.range {
335 Some(GetRange::Bounded(range)) => Some(range.clone()),
336 _ => None, };
338 let result = self.target.get_opts(location, options).await;
339 if let Ok(result) = &result {
340 let num_bytes = result.range.end - result.range.start;
341
342 self.record_read("get_opts", location.to_owned(), num_bytes, range);
343 }
344 result
345 }
346
347 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
348 let _guard = self.stage_guard();
349 let result = self.target.get_ranges(location, ranges).await;
350 if let Ok(result) = &result {
351 self.record_read(
352 "get_ranges",
353 location.to_owned(),
354 result.iter().map(|b| b.len() as u64).sum(),
355 None,
356 );
357 }
358 result
359 }
360
361 fn delete_stream(
362 &self,
363 locations: BoxStream<'static, OSResult<Path>>,
364 ) -> BoxStream<'static, OSResult<Path>> {
365 let stats = Arc::clone(&self.stats);
366 let tracked = locations
367 .map_ok(move |path| {
368 let mut stats = stats.lock().unwrap();
369 stats.write_iops += 1;
370 #[cfg(feature = "test-util")]
371 stats.requests.push(IoRequestRecord {
372 method: "delete",
373 path: path.clone(),
374 range: None,
375 });
376 path
377 })
378 .boxed();
379 self.target.delete_stream(tracked)
380 }
381
382 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
383 let _guard = self.stage_guard();
384 self.record_read("list", prefix.cloned().unwrap_or_default(), 0, None);
385 self.target.list(prefix)
386 }
387
388 fn list_with_offset(
389 &self,
390 prefix: Option<&Path>,
391 offset: &Path,
392 ) -> BoxStream<'static, OSResult<ObjectMeta>> {
393 self.record_read(
394 "list_with_offset",
395 prefix.cloned().unwrap_or_default(),
396 0,
397 None,
398 );
399 self.target.list_with_offset(prefix, offset)
400 }
401
402 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
403 let _guard = self.stage_guard();
404 self.record_read(
405 "list_with_delimiter",
406 prefix.cloned().unwrap_or_default(),
407 0,
408 None,
409 );
410 self.target.list_with_delimiter(prefix).await
411 }
412
413 async fn copy_opts(&self, from: &Path, to: &Path, opts: CopyOptions) -> OSResult<()> {
414 let _guard = self.stage_guard();
415 self.record_write("copy", from.to_owned(), 0);
416 self.target.copy_opts(from, to, opts).await
417 }
418
419 async fn rename_opts(&self, from: &Path, to: &Path, opts: RenameOptions) -> OSResult<()> {
420 let _guard = self.stage_guard();
421 self.record_write("rename", from.to_owned(), 0);
422 self.target.rename_opts(from, to, opts).await
423 }
424}
425
426#[derive(Debug)]
427struct IoTrackingMultipartUpload {
428 target: Box<dyn MultipartUpload>,
429 #[cfg(feature = "test-util")]
430 path: Path,
431 stats: Arc<Mutex<IoStats>>,
432 #[cfg(feature = "test-util")]
433 _guard: StageGuard,
434}
435
436#[async_trait::async_trait]
437impl MultipartUpload for IoTrackingMultipartUpload {
438 async fn abort(&mut self) -> OSResult<()> {
439 self.target.abort().await
440 }
441
442 async fn complete(&mut self) -> OSResult<PutResult> {
443 self.target.complete().await
444 }
445
446 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
447 {
448 let mut stats = self.stats.lock().unwrap();
449 stats.write_iops += 1;
450 stats.written_bytes += payload.content_length() as u64;
451 #[cfg(feature = "test-util")]
452 stats.requests.push(IoRequestRecord {
453 method: "put_part",
454 path: self.path.to_owned(),
455 range: None,
456 });
457 }
458 self.target.put_part(payload)
459 }
460}
461
462#[cfg(feature = "test-util")]
463#[derive(Debug)]
464struct StageGuard {
465 active_requests: Arc<AtomicU16>,
466 stats: Arc<Mutex<IoStats>>,
467}
468
469#[cfg(not(feature = "test-util"))]
470struct StageGuard;
471
472#[cfg(feature = "test-util")]
473impl StageGuard {
474 fn new(active_requests: Arc<AtomicU16>, stats: Arc<Mutex<IoStats>>) -> Self {
475 active_requests.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
476 Self {
477 active_requests,
478 stats,
479 }
480 }
481}
482
483#[cfg(feature = "test-util")]
484impl Drop for StageGuard {
485 fn drop(&mut self) {
486 if self
487 .active_requests
488 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
489 == 1
490 {
491 let mut stats = self.stats.lock().unwrap();
492 stats.num_stages += 1;
493 }
494 }
495}