Skip to main content

lance/dataset/
progress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use lance_table::format::Fragment;
8
9use crate::Result;
10
11/// Progress of writing a [Fragment].
12///
13/// When start writing a [`Fragment`], WriteProgress::begin() will be called before
14/// writing any data.
15///
16/// When stop writing a [`Fragment`], WriteProgress::complete() will be called after.
17///
18/// This might be called concurrently when writing multiple [`Fragment`]s. Therefore,
19/// the methods require non-exclusive access to `self`.
20///
21/// This is an experimental API and may change in the future.
22#[async_trait]
23pub trait WriteFragmentProgress: std::fmt::Debug + Sync + Send {
24    /// Indicate the beginning of writing a [Fragment], with the in-flight multipart ID.
25    async fn begin(&self, fragment: &Fragment) -> Result<()>;
26
27    /// Complete writing a [Fragment].
28    async fn complete(&self, fragment: &Fragment) -> Result<()>;
29}
30
31/// Statistics reported to the write progress callback set via
32/// [`InsertBuilder::progress`](crate::dataset::InsertBuilder::progress) or
33/// [`WriteParams::write_progress`](crate::dataset::WriteParams::write_progress).
34#[derive(Debug, Clone, Default)]
35pub struct WriteStats {
36    /// Cumulative bytes handed to the writer so far.
37    ///
38    /// For local storage this closely tracks bytes flushed to disk. For cloud
39    /// object stores (S3, GCS, Azure) this reflects bytes handed to the
40    /// multipart-upload buffer; actual network I/O may lag slightly.
41    pub bytes_written: u64,
42    /// Cumulative rows written so far.
43    pub rows_written: u64,
44    /// Number of files (fragments) whose writes have completed so far.
45    pub files_written: u32,
46}
47
48/// An opaque wrapper around a write-progress closure.
49///
50/// Stored inside [`WriteParams::write_progress`](crate::dataset::WriteParams::write_progress).
51/// Construct via [`InsertBuilder::progress`](crate::dataset::InsertBuilder::progress) or
52/// directly with [`WriteProgressFn::new`].
53#[derive(Clone)]
54pub struct WriteProgressFn(Arc<dyn Fn(WriteStats) + Send + Sync>);
55
56impl WriteProgressFn {
57    pub fn new(f: impl Fn(WriteStats) + Send + Sync + 'static) -> Self {
58        Self(Arc::new(f))
59    }
60
61    pub(crate) fn call(&self, stats: WriteStats) {
62        (self.0)(stats);
63    }
64}
65
66impl std::fmt::Debug for WriteProgressFn {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("WriteProgressFn").finish_non_exhaustive()
69    }
70}
71
72/// By default, Progress tracker is Noop.
73#[derive(Debug, Clone, Default)]
74pub struct NoopFragmentWriteProgress {}
75
76impl NoopFragmentWriteProgress {
77    pub fn new() -> Self {
78        Self {}
79    }
80}
81
82#[async_trait]
83impl WriteFragmentProgress for NoopFragmentWriteProgress {
84    #[inline]
85    async fn begin(&self, _fragment: &Fragment) -> Result<()> {
86        Ok(())
87    }
88
89    #[inline]
90    async fn complete(&self, _fragment: &Fragment) -> Result<()> {
91        Ok(())
92    }
93}