1use std::fmt::{Display, Formatter};
12use std::ops::Range;
13#[cfg(feature = "test-util")]
14use std::sync::atomic::AtomicU16;
15use std::sync::{Arc, Mutex};
16
17use bytes::Bytes;
18use futures::stream::BoxStream;
19use object_store::path::Path;
20use object_store::{
21 GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
22 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart,
23};
24
25use crate::object_store::WrappingObjectStore;
26
27#[derive(Debug, Default, Clone)]
28pub struct IOTracker(Arc<Mutex<IoStats>>);
29
30impl IOTracker {
31 pub fn incremental_stats(&self) -> IoStats {
36 std::mem::take(&mut *self.0.lock().unwrap())
37 }
38
39 pub fn stats(&self) -> IoStats {
44 self.0.lock().unwrap().clone()
45 }
46
47 pub fn record_read(
52 &self,
53 #[allow(unused_variables)] method: &'static str,
54 #[allow(unused_variables)] path: Path,
55 num_bytes: u64,
56 #[allow(unused_variables)] range: Option<Range<u64>>,
57 ) {
58 let mut stats = self.0.lock().unwrap();
59 stats.read_iops += 1;
60 stats.read_bytes += num_bytes;
61 #[cfg(feature = "test-util")]
62 stats.requests.push(IoRequestRecord {
63 method,
64 path,
65 range,
66 });
67 }
68
69 pub fn record_write(
74 &self,
75 #[allow(unused_variables)] method: &'static str,
76 #[allow(unused_variables)] path: Path,
77 num_bytes: u64,
78 ) {
79 let mut stats = self.0.lock().unwrap();
80 stats.write_iops += 1;
81 stats.written_bytes += num_bytes;
82 #[cfg(feature = "test-util")]
83 stats.requests.push(IoRequestRecord {
84 method,
85 path,
86 range: None,
87 });
88 }
89}
90
91impl WrappingObjectStore for IOTracker {
92 fn wrap(&self, _store_prefix: &str, target: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
93 Arc::new(IoTrackingStore::new(target, self.0.clone()))
94 }
95}
96
97#[derive(Debug, Default, Clone)]
98pub struct IoStats {
99 pub read_iops: u64,
100 pub read_bytes: u64,
101 pub write_iops: u64,
102 pub written_bytes: u64,
103 #[cfg(feature = "test-util")]
105 pub num_stages: u64,
107 #[cfg(feature = "test-util")]
108 pub requests: Vec<IoRequestRecord>,
109}
110
111#[cfg(feature = "test-util")]
116#[macro_export]
117macro_rules! assert_io_eq {
118 ($io_stats:expr, $field:ident, $expected:expr) => {
119 assert_eq!(
120 $io_stats.$field, $expected,
121 "Expected {} to be {}, got {}. Requests: {:#?}",
122 stringify!($field),
123 $expected,
124 $io_stats.$field,
125 $io_stats.requests
126 );
127 };
128 ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
129 assert_eq!(
130 $io_stats.$field, $expected,
131 "Expected {} to be {}, got {}. Requests: {:#?} {}",
132 stringify!($field),
133 $expected,
134 $io_stats.$field,
135 $io_stats.requests,
136 format_args!($($arg)+)
137 );
138 };
139}
140
141#[cfg(feature = "test-util")]
142#[macro_export]
143macro_rules! assert_io_gt {
144 ($io_stats:expr, $field:ident, $expected:expr) => {
145 assert!(
146 $io_stats.$field > $expected,
147 "Expected {} to be > {}, got {}. Requests: {:#?}",
148 stringify!($field),
149 $expected,
150 $io_stats.$field,
151 $io_stats.requests
152 );
153 };
154 ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
155 assert!(
156 $io_stats.$field > $expected,
157 "Expected {} to be > {}, got {}. Requests: {:#?} {}",
158 stringify!($field),
159 $expected,
160 $io_stats.$field,
161 $io_stats.requests,
162 format_args!($($arg)+)
163 );
164 };
165}
166
167#[cfg(feature = "test-util")]
168#[macro_export]
169macro_rules! assert_io_lt {
170 ($io_stats:expr, $field:ident, $expected:expr) => {
171 assert!(
172 $io_stats.$field < $expected,
173 "Expected {} to be < {}, got {}. Requests: {:#?}",
174 stringify!($field),
175 $expected,
176 $io_stats.$field,
177 $io_stats.requests
178 );
179 };
180 ($io_stats:expr, $field:ident, $expected:expr, $($arg:tt)+) => {
181 assert!(
182 $io_stats.$field < $expected,
183 "Expected {} to be < {}, got {}. Requests: {:#?} {}",
184 stringify!($field),
185 $expected,
186 $io_stats.$field,
187 $io_stats.requests,
188 format_args!($($arg)+)
189 );
190 };
191}
192
193#[allow(dead_code)]
196#[derive(Clone)]
197pub struct IoRequestRecord {
198 pub method: &'static str,
199 pub path: Path,
200 pub range: Option<Range<u64>>,
201}
202
203impl std::fmt::Debug for IoRequestRecord {
204 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
205 write!(
207 f,
208 "IORequest(method={}, path=\"{}\"",
209 self.method, self.path
210 )?;
211 if let Some(range) = &self.range {
212 write!(f, ", range={:?}", range)?;
213 }
214 write!(f, ")")?;
215 Ok(())
216 }
217}
218
219impl Display for IoStats {
220 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
221 write!(f, "{:#?}", self)
222 }
223}
224
225#[derive(Debug)]
226pub struct IoTrackingStore {
227 target: Arc<dyn ObjectStore>,
228 stats: Arc<Mutex<IoStats>>,
229 #[cfg(feature = "test-util")]
230 active_requests: Arc<AtomicU16>,
231}
232
233impl Display for IoTrackingStore {
234 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
235 write!(f, "{:#?}", self)
236 }
237}
238
239impl IoTrackingStore {
240 pub fn new(target: Arc<dyn ObjectStore>, stats: Arc<Mutex<IoStats>>) -> Self {
241 Self {
242 target,
243 stats,
244 #[cfg(feature = "test-util")]
245 active_requests: Arc::new(AtomicU16::new(0)),
246 }
247 }
248
249 fn record_read(
250 &self,
251 method: &'static str,
252 path: Path,
253 num_bytes: u64,
254 range: Option<Range<u64>>,
255 ) {
256 let mut stats = self.stats.lock().unwrap();
257 stats.read_iops += 1;
258 stats.read_bytes += num_bytes;
259 #[cfg(feature = "test-util")]
260 stats.requests.push(IoRequestRecord {
261 method,
262 path,
263 range,
264 });
265 #[cfg(not(feature = "test-util"))]
266 let _ = (method, path, range); }
268
269 fn record_write(&self, method: &'static str, path: Path, num_bytes: u64) {
270 let mut stats = self.stats.lock().unwrap();
271 stats.write_iops += 1;
272 stats.written_bytes += num_bytes;
273 #[cfg(feature = "test-util")]
274 stats.requests.push(IoRequestRecord {
275 method,
276 path,
277 range: None,
278 });
279 #[cfg(not(feature = "test-util"))]
280 let _ = (method, path); }
282
283 #[cfg(feature = "test-util")]
284 fn stage_guard(&self) -> StageGuard {
285 StageGuard::new(self.active_requests.clone(), self.stats.clone())
286 }
287
288 #[cfg(not(feature = "test-util"))]
289 fn stage_guard(&self) -> StageGuard {
290 StageGuard
291 }
292}
293
294#[async_trait::async_trait]
295#[deny(clippy::missing_trait_methods)]
296impl ObjectStore for IoTrackingStore {
297 async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult<PutResult> {
298 let _guard = self.stage_guard();
299 self.record_write("put", location.to_owned(), bytes.content_length() as u64);
300 self.target.put(location, bytes).await
301 }
302
303 async fn put_opts(
304 &self,
305 location: &Path,
306 bytes: PutPayload,
307 opts: PutOptions,
308 ) -> OSResult<PutResult> {
309 let _guard = self.stage_guard();
310 self.record_write(
311 "put_opts",
312 location.to_owned(),
313 bytes.content_length() as u64,
314 );
315 self.target.put_opts(location, bytes, opts).await
316 }
317
318 async fn put_multipart(&self, location: &Path) -> OSResult<Box<dyn MultipartUpload>> {
319 let _guard = self.stage_guard();
320 let target = self.target.put_multipart(location).await?;
321 Ok(Box::new(IoTrackingMultipartUpload {
322 target,
323 stats: self.stats.clone(),
324 #[cfg(feature = "test-util")]
325 path: location.to_owned(),
326 #[cfg(feature = "test-util")]
327 _guard,
328 }))
329 }
330
331 async fn put_multipart_opts(
332 &self,
333 location: &Path,
334 opts: PutMultipartOptions,
335 ) -> OSResult<Box<dyn MultipartUpload>> {
336 let _guard = self.stage_guard();
337 let target = self.target.put_multipart_opts(location, opts).await?;
338 Ok(Box::new(IoTrackingMultipartUpload {
339 target,
340 stats: self.stats.clone(),
341 #[cfg(feature = "test-util")]
342 path: location.to_owned(),
343 #[cfg(feature = "test-util")]
344 _guard,
345 }))
346 }
347
348 async fn get(&self, location: &Path) -> OSResult<GetResult> {
349 let _guard = self.stage_guard();
350 let result = self.target.get(location).await;
351 if let Ok(result) = &result {
352 let num_bytes = result.range.end - result.range.start;
353 self.record_read("get", location.to_owned(), num_bytes, None);
354 }
355 result
356 }
357
358 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
359 let _guard = self.stage_guard();
360 let range = match &options.range {
361 Some(GetRange::Bounded(range)) => Some(range.clone()),
362 _ => None, };
364 let result = self.target.get_opts(location, options).await;
365 if let Ok(result) = &result {
366 let num_bytes = result.range.end - result.range.start;
367
368 self.record_read("get_opts", location.to_owned(), num_bytes, range);
369 }
370 result
371 }
372
373 async fn get_range(&self, location: &Path, range: Range<u64>) -> OSResult<Bytes> {
374 let _guard = self.stage_guard();
375 let result = self.target.get_range(location, range.clone()).await;
376 if let Ok(result) = &result {
377 self.record_read(
378 "get_range",
379 location.to_owned(),
380 result.len() as u64,
381 Some(range),
382 );
383 }
384 result
385 }
386
387 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
388 let _guard = self.stage_guard();
389 let result = self.target.get_ranges(location, ranges).await;
390 if let Ok(result) = &result {
391 self.record_read(
392 "get_ranges",
393 location.to_owned(),
394 result.iter().map(|b| b.len() as u64).sum(),
395 None,
396 );
397 }
398 result
399 }
400
401 async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
402 let _guard = self.stage_guard();
403 self.record_read("head", location.to_owned(), 0, None);
404 self.target.head(location).await
405 }
406
407 async fn delete(&self, location: &Path) -> OSResult<()> {
408 let _guard = self.stage_guard();
409 self.record_write("delete", location.to_owned(), 0);
410 self.target.delete(location).await
411 }
412
413 fn delete_stream<'a>(
414 &'a self,
415 locations: BoxStream<'a, OSResult<Path>>,
416 ) -> BoxStream<'a, OSResult<Path>> {
417 self.target.delete_stream(locations)
418 }
419
420 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
421 let _guard = self.stage_guard();
422 self.record_read("list", prefix.cloned().unwrap_or_default(), 0, None);
423 self.target.list(prefix)
424 }
425
426 fn list_with_offset(
427 &self,
428 prefix: Option<&Path>,
429 offset: &Path,
430 ) -> BoxStream<'static, OSResult<ObjectMeta>> {
431 self.record_read(
432 "list_with_offset",
433 prefix.cloned().unwrap_or_default(),
434 0,
435 None,
436 );
437 self.target.list_with_offset(prefix, offset)
438 }
439
440 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
441 let _guard = self.stage_guard();
442 self.record_read(
443 "list_with_delimiter",
444 prefix.cloned().unwrap_or_default(),
445 0,
446 None,
447 );
448 self.target.list_with_delimiter(prefix).await
449 }
450
451 async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
452 let _guard = self.stage_guard();
453 self.record_write("copy", from.to_owned(), 0);
454 self.target.copy(from, to).await
455 }
456
457 async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
458 let _guard = self.stage_guard();
459 self.record_write("rename", from.to_owned(), 0);
460 self.target.rename(from, to).await
461 }
462
463 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
464 let _guard = self.stage_guard();
465 self.record_write("rename_if_not_exists", from.to_owned(), 0);
466 self.target.rename_if_not_exists(from, to).await
467 }
468
469 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
470 let _guard = self.stage_guard();
471 self.record_write("copy_if_not_exists", from.to_owned(), 0);
472 self.target.copy_if_not_exists(from, to).await
473 }
474}
475
476#[derive(Debug)]
477struct IoTrackingMultipartUpload {
478 target: Box<dyn MultipartUpload>,
479 #[cfg(feature = "test-util")]
480 path: Path,
481 stats: Arc<Mutex<IoStats>>,
482 #[cfg(feature = "test-util")]
483 _guard: StageGuard,
484}
485
486#[async_trait::async_trait]
487impl MultipartUpload for IoTrackingMultipartUpload {
488 async fn abort(&mut self) -> OSResult<()> {
489 self.target.abort().await
490 }
491
492 async fn complete(&mut self) -> OSResult<PutResult> {
493 self.target.complete().await
494 }
495
496 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
497 {
498 let mut stats = self.stats.lock().unwrap();
499 stats.write_iops += 1;
500 stats.written_bytes += payload.content_length() as u64;
501 #[cfg(feature = "test-util")]
502 stats.requests.push(IoRequestRecord {
503 method: "put_part",
504 path: self.path.to_owned(),
505 range: None,
506 });
507 }
508 self.target.put_part(payload)
509 }
510}
511
512#[cfg(feature = "test-util")]
513#[derive(Debug)]
514struct StageGuard {
515 active_requests: Arc<AtomicU16>,
516 stats: Arc<Mutex<IoStats>>,
517}
518
519#[cfg(not(feature = "test-util"))]
520struct StageGuard;
521
522#[cfg(feature = "test-util")]
523impl StageGuard {
524 fn new(active_requests: Arc<AtomicU16>, stats: Arc<Mutex<IoStats>>) -> Self {
525 active_requests.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
526 Self {
527 active_requests,
528 stats,
529 }
530 }
531}
532
533#[cfg(feature = "test-util")]
534impl Drop for StageGuard {
535 fn drop(&mut self) {
536 if self
537 .active_requests
538 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
539 == 1
540 {
541 let mut stats = self.stats.lock().unwrap();
542 stats.num_stages += 1;
543 }
544 }
545}