lance/dataset/progress.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use lance_table::format::Fragment;
8
9use crate::Result;
10
11/// Progress of writing a [Fragment].
12///
13/// When start writing a [`Fragment`], WriteProgress::begin() will be called before
14/// writing any data.
15///
16/// When stop writing a [`Fragment`], WriteProgress::complete() will be called after.
17///
18/// This might be called concurrently when writing multiple [`Fragment`]s. Therefore,
19/// the methods require non-exclusive access to `self`.
20///
21/// This is an experimental API and may change in the future.
22#[async_trait]
23pub trait WriteFragmentProgress: std::fmt::Debug + Sync + Send {
24 /// Indicate the beginning of writing a [Fragment], with the in-flight multipart ID.
25 async fn begin(&self, fragment: &Fragment) -> Result<()>;
26
27 /// Complete writing a [Fragment].
28 async fn complete(&self, fragment: &Fragment) -> Result<()>;
29}
30
31/// Statistics reported to the write progress callback set via
32/// [`InsertBuilder::progress`](crate::dataset::InsertBuilder::progress) or
33/// [`WriteParams::write_progress`](crate::dataset::WriteParams::write_progress).
34#[derive(Debug, Clone, Default)]
35pub struct WriteStats {
36 /// Cumulative bytes handed to the writer so far.
37 ///
38 /// For local storage this closely tracks bytes flushed to disk. For cloud
39 /// object stores (S3, GCS, Azure) this reflects bytes handed to the
40 /// multipart-upload buffer; actual network I/O may lag slightly.
41 pub bytes_written: u64,
42 /// Cumulative rows written so far.
43 pub rows_written: u64,
44 /// Number of files (fragments) whose writes have completed so far.
45 pub files_written: u32,
46}
47
48/// An opaque wrapper around a write-progress closure.
49///
50/// Stored inside [`WriteParams::write_progress`](crate::dataset::WriteParams::write_progress).
51/// Construct via [`InsertBuilder::progress`](crate::dataset::InsertBuilder::progress) or
52/// directly with [`WriteProgressFn::new`].
53#[derive(Clone)]
54pub struct WriteProgressFn(Arc<dyn Fn(WriteStats) + Send + Sync>);
55
56impl WriteProgressFn {
57 pub fn new(f: impl Fn(WriteStats) + Send + Sync + 'static) -> Self {
58 Self(Arc::new(f))
59 }
60
61 pub(crate) fn call(&self, stats: WriteStats) {
62 (self.0)(stats);
63 }
64}
65
66impl std::fmt::Debug for WriteProgressFn {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("WriteProgressFn").finish_non_exhaustive()
69 }
70}
71
72/// By default, Progress tracker is Noop.
73#[derive(Debug, Clone, Default)]
74pub struct NoopFragmentWriteProgress {}
75
76impl NoopFragmentWriteProgress {
77 pub fn new() -> Self {
78 Self {}
79 }
80}
81
82#[async_trait]
83impl WriteFragmentProgress for NoopFragmentWriteProgress {
84 #[inline]
85 async fn begin(&self, _fragment: &Fragment) -> Result<()> {
86 Ok(())
87 }
88
89 #[inline]
90 async fn complete(&self, _fragment: &Fragment) -> Result<()> {
91 Ok(())
92 }
93}