Skip to main content

lance_io/
object_writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::io;
5use std::pin::Pin;
6use std::sync::{Arc, OnceLock};
7use std::task::Poll;
8
9use crate::object_store::ObjectStore as LanceObjectStore;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::FutureExt;
13use futures::future::BoxFuture;
14use object_store::MultipartUpload;
15use object_store::{Error as OSError, ObjectStore, Result as OSResult, path::Path};
16use rand::Rng;
17use tokio::io::{AsyncWrite, AsyncWriteExt};
18use tokio::task::JoinSet;
19
20use lance_core::{Error, Result};
21use tracing::Instrument;
22
23use crate::traits::Writer;
24use crate::utils::tracking_store::IOTracker;
25use tokio::runtime::Handle;
26
27/// Start at 5MB.
28const INITIAL_UPLOAD_STEP: usize = 1024 * 1024 * 5;
29
30fn max_upload_parallelism() -> usize {
31    static MAX_UPLOAD_PARALLELISM: OnceLock<usize> = OnceLock::new();
32    *MAX_UPLOAD_PARALLELISM.get_or_init(|| {
33        std::env::var("LANCE_UPLOAD_CONCURRENCY")
34            .ok()
35            .and_then(|s| s.parse::<usize>().ok())
36            .unwrap_or(10)
37    })
38}
39
40fn max_conn_reset_retries() -> u16 {
41    static MAX_CONN_RESET_RETRIES: OnceLock<u16> = OnceLock::new();
42    *MAX_CONN_RESET_RETRIES.get_or_init(|| {
43        std::env::var("LANCE_CONN_RESET_RETRIES")
44            .ok()
45            .and_then(|s| s.parse::<u16>().ok())
46            .unwrap_or(20)
47    })
48}
49
50/// Maximum part size in GCS and S3: 5GB.
51const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5;
52
53/// Clamps a requested upload part size to the valid [5MB, 5GB] range.
54/// Returns the clamped value and whether clamping was necessary.
55fn clamp_initial_upload_size(raw: usize) -> (usize, bool) {
56    let clamped = raw.clamp(INITIAL_UPLOAD_STEP, MAX_UPLOAD_PART_SIZE);
57    (clamped, clamped != raw)
58}
59
60fn initial_upload_size() -> usize {
61    static LANCE_INITIAL_UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
62    *LANCE_INITIAL_UPLOAD_SIZE.get_or_init(|| {
63        let Some(raw) = std::env::var("LANCE_INITIAL_UPLOAD_SIZE")
64            .ok()
65            .and_then(|s| s.parse::<usize>().ok())
66        else {
67            return INITIAL_UPLOAD_STEP;
68        };
69        let (clamped, was_clamped) = clamp_initial_upload_size(raw);
70        if was_clamped {
71            // OnceLock caches the result, so this warning fires at most once per process.
72            tracing::warn!(
73                requested = raw,
74                clamped,
75                "LANCE_INITIAL_UPLOAD_SIZE must be between 5MB and 5GB; clamping to valid range"
76            );
77        }
78        clamped
79    })
80}
81
82/// Writer to an object in an object store.
83///
84/// If the object is small enough, the writer will upload the object in a single
85/// PUT request. If the object is larger, the writer will create a multipart
86/// upload and upload parts in parallel.
87///
88/// This implements the `AsyncWrite` trait.
89pub struct ObjectWriter {
90    state: UploadState,
91    path: Arc<Path>,
92    cursor: usize,
93    connection_resets: u16,
94    buffer: Vec<u8>,
95    // TODO: use constant size to support R2
96    use_constant_size_upload_parts: bool,
97}
98
99#[derive(Debug, Clone, Default)]
100pub struct WriteResult {
101    pub size: usize,
102    pub e_tag: Option<String>,
103}
104
105enum UploadState {
106    /// The writer has been opened but no data has been written yet. Will be in
107    /// this state until the buffer is full or the writer is shut down.
108    Started(Arc<dyn ObjectStore>),
109    /// The writer is in the process of creating a multipart upload.
110    CreatingUpload(BoxFuture<'static, OSResult<Box<dyn MultipartUpload>>>),
111    /// The writer is in the process of uploading parts.
112    InProgress {
113        part_idx: u16,
114        upload: Box<dyn MultipartUpload>,
115        futures: JoinSet<std::result::Result<(), UploadPutError>>,
116    },
117    /// The writer is in the process of uploading data in a single PUT request.
118    /// This happens when shutdown is called before the buffer is full.
119    PuttingSingle(BoxFuture<'static, OSResult<WriteResult>>),
120    /// The writer is in the process of completing the multipart upload.
121    Completing(BoxFuture<'static, OSResult<WriteResult>>),
122    /// The writer has been shut down and all data has been written.
123    Done(WriteResult),
124}
125
126/// Methods for state transitions.
127impl UploadState {
128    fn started_to_putting_single(&mut self, path: Arc<Path>, buffer: Vec<u8>) {
129        // To get owned self, we temporarily swap with Done.
130        let this = std::mem::replace(self, Self::Done(WriteResult::default()));
131        *self = match this {
132            Self::Started(store) => {
133                let fut = async move {
134                    let size = buffer.len();
135                    let res = store.put(&path, buffer.into()).await?;
136                    Ok(WriteResult {
137                        size,
138                        e_tag: res.e_tag,
139                    })
140                };
141                Self::PuttingSingle(Box::pin(fut))
142            }
143            _ => unreachable!(),
144        }
145    }
146
147    fn in_progress_to_completing(&mut self) {
148        // To get owned self, we temporarily swap with Done.
149        let this = std::mem::replace(self, Self::Done(WriteResult::default()));
150        *self = match this {
151            Self::InProgress {
152                mut upload,
153                futures,
154                ..
155            } => {
156                debug_assert!(futures.is_empty());
157                let fut = async move {
158                    let res = upload.complete().await?;
159                    Ok(WriteResult {
160                        size: 0, // This will be set properly later.
161                        e_tag: res.e_tag,
162                    })
163                };
164                Self::Completing(Box::pin(fut))
165            }
166            _ => unreachable!(),
167        };
168    }
169}
170
171impl ObjectWriter {
172    pub async fn new(object_store: &LanceObjectStore, path: &Path) -> Result<Self> {
173        Ok(Self {
174            state: UploadState::Started(object_store.inner.clone()),
175            cursor: 0,
176            path: Arc::new(path.clone()),
177            connection_resets: 0,
178            buffer: Vec::with_capacity(initial_upload_size()),
179            use_constant_size_upload_parts: object_store.use_constant_size_upload_parts,
180        })
181    }
182
183    /// Returns the contents of `buffer` as a `Bytes` object and resets `buffer`.
184    /// The new capacity of `buffer` is determined by the current part index.
185    fn next_part_buffer(buffer: &mut Vec<u8>, part_idx: u16, constant_upload_size: bool) -> Bytes {
186        let new_capacity = if constant_upload_size {
187            // The store does not support variable part sizes, so use the initial size.
188            initial_upload_size()
189        } else {
190            // Increase the upload size every 100 parts. This gives maximum part size of 2.5TB.
191            initial_upload_size().max(((part_idx / 100) as usize + 1) * INITIAL_UPLOAD_STEP)
192        };
193        let new_buffer = Vec::with_capacity(new_capacity);
194        let part = std::mem::replace(buffer, new_buffer);
195        Bytes::from(part)
196    }
197
198    fn put_part(
199        upload: &mut dyn MultipartUpload,
200        buffer: Bytes,
201        part_idx: u16,
202        sleep: Option<std::time::Duration>,
203    ) -> BoxFuture<'static, std::result::Result<(), UploadPutError>> {
204        log::debug!(
205            "MultipartUpload submitting part with {} bytes",
206            buffer.len()
207        );
208        let fut = upload.put_part(buffer.clone().into());
209        Box::pin(async move {
210            if let Some(sleep) = sleep {
211                tokio::time::sleep(sleep).await;
212            }
213            fut.await.map_err(|source| UploadPutError {
214                part_idx,
215                buffer,
216                source,
217            })?;
218            Ok(())
219        })
220    }
221
222    fn poll_tasks(
223        mut self: Pin<&mut Self>,
224        cx: &mut std::task::Context<'_>,
225    ) -> std::result::Result<(), io::Error> {
226        let mut_self = &mut *self;
227        loop {
228            match &mut mut_self.state {
229                UploadState::Started(_) | UploadState::Done(_) => break,
230                UploadState::CreatingUpload(fut) => match fut.poll_unpin(cx) {
231                    Poll::Ready(Ok(mut upload)) => {
232                        let mut futures = JoinSet::new();
233
234                        let data = Self::next_part_buffer(
235                            &mut mut_self.buffer,
236                            0,
237                            mut_self.use_constant_size_upload_parts,
238                        );
239                        futures.spawn(Self::put_part(upload.as_mut(), data, 0, None));
240
241                        mut_self.state = UploadState::InProgress {
242                            part_idx: 1, // We just used 0
243                            futures,
244                            upload,
245                        };
246                    }
247                    Poll::Ready(Err(e)) => return Err(std::io::Error::other(e)),
248                    Poll::Pending => break,
249                },
250                UploadState::InProgress {
251                    upload, futures, ..
252                } => {
253                    while let Poll::Ready(Some(res)) = futures.poll_join_next(cx) {
254                        match res {
255                            Ok(Ok(())) => {}
256                            Err(err) => return Err(std::io::Error::other(err)),
257                            Ok(Err(UploadPutError {
258                                source: OSError::Generic { source, .. },
259                                part_idx,
260                                buffer,
261                            })) if source
262                                .to_string()
263                                .to_lowercase()
264                                .contains("connection reset by peer") =>
265                            {
266                                if mut_self.connection_resets < max_conn_reset_retries() {
267                                    // Retry, but only up to max_conn_reset_retries of them.
268                                    mut_self.connection_resets += 1;
269
270                                    // Resubmit with random jitter
271                                    let sleep_time_ms = rand::rng().random_range(2_000..8_000);
272                                    let sleep_time =
273                                        std::time::Duration::from_millis(sleep_time_ms);
274
275                                    futures.spawn(Self::put_part(
276                                        upload.as_mut(),
277                                        buffer,
278                                        part_idx,
279                                        Some(sleep_time),
280                                    ));
281                                } else {
282                                    return Err(io::Error::new(
283                                        io::ErrorKind::ConnectionReset,
284                                        Box::new(ConnectionResetError {
285                                            message: format!(
286                                                "Hit max retries ({}) for connection reset",
287                                                max_conn_reset_retries()
288                                            ),
289                                            source,
290                                        }),
291                                    ));
292                                }
293                            }
294                            Ok(Err(err)) => return Err(err.source.into()),
295                        }
296                    }
297                    break;
298                }
299                UploadState::PuttingSingle(fut) | UploadState::Completing(fut) => {
300                    match fut.poll_unpin(cx) {
301                        Poll::Ready(Ok(mut res)) => {
302                            res.size = mut_self.cursor;
303                            mut_self.state = UploadState::Done(res)
304                        }
305                        Poll::Ready(Err(e)) => return Err(std::io::Error::other(e)),
306                        Poll::Pending => break,
307                    }
308                }
309            }
310        }
311        Ok(())
312    }
313
314    pub async fn abort(&mut self) {
315        let state = std::mem::replace(&mut self.state, UploadState::Done(WriteResult::default()));
316        if let UploadState::InProgress { mut upload, .. } = state {
317            let _ = upload.abort().await;
318        }
319    }
320}
321
322impl Drop for ObjectWriter {
323    fn drop(&mut self) {
324        // If there is a multipart upload started but not finished, we should abort it.
325        if matches!(self.state, UploadState::InProgress { .. }) {
326            // Take ownership of the state.
327            let state =
328                std::mem::replace(&mut self.state, UploadState::Done(WriteResult::default()));
329            if let UploadState::InProgress { mut upload, .. } = state
330                && let Ok(handle) = Handle::try_current()
331            {
332                handle.spawn(async move {
333                    let _ = upload.abort().await;
334                });
335            }
336        }
337    }
338}
339
340/// Returned error from trying to upload a part.
341/// Has the part_idx and buffer so we can pass
342/// them to the retry logic.
343struct UploadPutError {
344    part_idx: u16,
345    buffer: Bytes,
346    source: OSError,
347}
348
349#[derive(Debug)]
350struct ConnectionResetError {
351    message: String,
352    source: Box<dyn std::error::Error + Send + Sync>,
353}
354
355impl std::error::Error for ConnectionResetError {}
356
357impl std::fmt::Display for ConnectionResetError {
358    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359        write!(f, "{}: {}", self.message, self.source)
360    }
361}
362
363impl AsyncWrite for ObjectWriter {
364    fn poll_write(
365        mut self: std::pin::Pin<&mut Self>,
366        cx: &mut std::task::Context<'_>,
367        buf: &[u8],
368    ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
369        self.as_mut().poll_tasks(cx)?;
370
371        // Fill buffer up to remaining capacity.
372        let remaining_capacity = self.buffer.capacity() - self.buffer.len();
373        let bytes_to_write = std::cmp::min(remaining_capacity, buf.len());
374        self.buffer.extend_from_slice(&buf[..bytes_to_write]);
375        self.cursor += bytes_to_write;
376
377        // Rust needs a little help to borrow self mutably and immutably at the same time
378        // through a Pin.
379        let mut_self = &mut *self;
380
381        // Instantiate next request, if available.
382        if mut_self.buffer.capacity() == mut_self.buffer.len() {
383            match &mut mut_self.state {
384                UploadState::Started(store) => {
385                    let path = mut_self.path.clone();
386                    let store = store.clone();
387                    let fut = Box::pin(async move { store.put_multipart(path.as_ref()).await });
388                    self.state = UploadState::CreatingUpload(fut);
389                }
390                UploadState::InProgress {
391                    upload,
392                    part_idx,
393                    futures,
394                    ..
395                } => {
396                    // TODO: Make max concurrency configurable from storage options.
397                    if futures.len() < max_upload_parallelism() {
398                        let data = Self::next_part_buffer(
399                            &mut mut_self.buffer,
400                            *part_idx,
401                            mut_self.use_constant_size_upload_parts,
402                        );
403                        futures.spawn(
404                            Self::put_part(upload.as_mut(), data, *part_idx, None)
405                                .instrument(tracing::Span::current()),
406                        );
407                        *part_idx += 1;
408                    }
409                }
410                _ => {}
411            }
412        }
413
414        self.poll_tasks(cx)?;
415
416        match bytes_to_write {
417            0 => Poll::Pending,
418            _ => Poll::Ready(Ok(bytes_to_write)),
419        }
420    }
421
422    fn poll_flush(
423        mut self: std::pin::Pin<&mut Self>,
424        cx: &mut std::task::Context<'_>,
425    ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
426        self.as_mut().poll_tasks(cx)?;
427
428        match &self.state {
429            UploadState::Started(_) | UploadState::Done(_) => Poll::Ready(Ok(())),
430            UploadState::CreatingUpload(_)
431            | UploadState::Completing(_)
432            | UploadState::PuttingSingle(_) => Poll::Pending,
433            UploadState::InProgress { futures, .. } => {
434                if futures.is_empty() {
435                    Poll::Ready(Ok(()))
436                } else {
437                    Poll::Pending
438                }
439            }
440        }
441    }
442
443    fn poll_shutdown(
444        mut self: std::pin::Pin<&mut Self>,
445        cx: &mut std::task::Context<'_>,
446    ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
447        loop {
448            self.as_mut().poll_tasks(cx)?;
449
450            // Rust needs a little help to borrow self mutably and immutably at the same time
451            // through a Pin.
452            let mut_self = &mut *self;
453            match &mut mut_self.state {
454                UploadState::Done(_) => return Poll::Ready(Ok(())),
455                UploadState::CreatingUpload(_)
456                | UploadState::PuttingSingle(_)
457                | UploadState::Completing(_) => return Poll::Pending,
458                UploadState::Started(_) => {
459                    // If we didn't start a multipart upload, we can just do a single put.
460                    let part = std::mem::take(&mut mut_self.buffer);
461                    let path = mut_self.path.clone();
462                    self.state.started_to_putting_single(path, part);
463                }
464                UploadState::InProgress {
465                    upload,
466                    futures,
467                    part_idx,
468                } => {
469                    // Flush final batch
470                    if !mut_self.buffer.is_empty() && futures.len() < max_upload_parallelism() {
471                        // We can just use `take` since we don't need the buffer anymore.
472                        let data = Bytes::from(std::mem::take(&mut mut_self.buffer));
473                        futures.spawn(
474                            Self::put_part(upload.as_mut(), data, *part_idx, None)
475                                .instrument(tracing::Span::current()),
476                        );
477                        // We need to go back to beginning of loop to poll the
478                        // new feature and get the waker registered on the ctx.
479                        continue;
480                    }
481
482                    // We handle the transition from in progress to completing here.
483                    if futures.is_empty() {
484                        self.state.in_progress_to_completing();
485                    } else {
486                        return Poll::Pending;
487                    }
488                }
489            }
490        }
491    }
492}
493
494#[async_trait]
495impl Writer for ObjectWriter {
496    async fn tell(&mut self) -> Result<usize> {
497        Ok(self.cursor)
498    }
499
500    async fn shutdown(&mut self) -> Result<WriteResult> {
501        AsyncWriteExt::shutdown(self).await.map_err(|e| {
502            Error::io(format!(
503                "failed to shutdown object writer for {}: {}",
504                self.path, e
505            ))
506        })?;
507        if let UploadState::Done(result) = &self.state {
508            Ok(result.clone())
509        } else {
510            unreachable!()
511        }
512    }
513}
514
515pub struct LocalWriter {
516    path: Path,
517    state: LocalWriteState,
518}
519
520#[derive(Default)]
521enum LocalWriteState {
522    Writing(WritingState),
523    Finishing {
524        size: usize,
525        future: BoxFuture<'static, Result<WriteResult>>,
526    },
527    Done(WriteResult),
528    #[default]
529    Poisoned,
530}
531
532struct WritingState {
533    writer: tokio::io::BufWriter<tokio::fs::File>,
534    cursor: usize,
535    /// Temp path that auto-deletes on drop. Set to `None` after `persist()`.
536    temp_path: tempfile::TempPath,
537    io_tracker: Arc<IOTracker>,
538}
539
540impl LocalWriter {
541    pub fn new(
542        file: tokio::fs::File,
543        path: Path,
544        temp_path: tempfile::TempPath,
545        io_tracker: Arc<IOTracker>,
546    ) -> Self {
547        Self {
548            path,
549            state: LocalWriteState::Writing(WritingState {
550                writer: tokio::io::BufWriter::new(file),
551                cursor: 0,
552                temp_path,
553                io_tracker,
554            }),
555        }
556    }
557
558    fn already_closed_err(path: &Path) -> io::Error {
559        io::Error::other(format!(
560            "cannot write to LocalWriter for {} after shutdown",
561            path
562        ))
563    }
564
565    fn poisoned_err(path: &Path) -> io::Error {
566        io::Error::other(format!("LocalWriter for {} is in poisoned state", path))
567    }
568
569    async fn persist(
570        temp_path: tempfile::TempPath,
571        final_path: Path,
572        size: usize,
573        io_tracker: Arc<IOTracker>,
574    ) -> Result<WriteResult> {
575        let local_path = crate::local::to_local_path(&final_path);
576        let e_tag = tokio::task::spawn_blocking(move || -> Result<String> {
577            temp_path.persist(&local_path).map_err(|e| {
578                Error::io(format!(
579                    "failed to persist temp file to {}: {}",
580                    local_path, e.error
581                ))
582            })?;
583
584            let metadata = std::fs::metadata(&local_path).map_err(|e| {
585                Error::io(format!("failed to read metadata for {}: {}", local_path, e))
586            })?;
587            Ok(get_etag(&metadata))
588        })
589        .await
590        .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
591
592        io_tracker.record_write("put", final_path, size as u64);
593
594        Ok(WriteResult {
595            size,
596            e_tag: Some(e_tag),
597        })
598    }
599}
600
601impl AsyncWrite for LocalWriter {
602    fn poll_write(
603        mut self: Pin<&mut Self>,
604        cx: &mut std::task::Context<'_>,
605        buf: &[u8],
606    ) -> Poll<std::result::Result<usize, std::io::Error>> {
607        if let LocalWriteState::Writing(state) = &mut self.state {
608            let poll = Pin::new(&mut state.writer).poll_write(cx, buf);
609            if let Poll::Ready(Ok(n)) = &poll {
610                state.cursor += *n;
611            }
612            poll
613        } else {
614            Poll::Ready(Err(Self::already_closed_err(&self.path)))
615        }
616    }
617
618    fn poll_flush(
619        mut self: Pin<&mut Self>,
620        cx: &mut std::task::Context<'_>,
621    ) -> Poll<std::result::Result<(), std::io::Error>> {
622        if let LocalWriteState::Writing(state) = &mut self.state {
623            Pin::new(&mut state.writer).poll_flush(cx)
624        } else {
625            Poll::Ready(Err(Self::already_closed_err(&self.path)))
626        }
627    }
628
629    fn poll_shutdown(
630        mut self: Pin<&mut Self>,
631        cx: &mut std::task::Context<'_>,
632    ) -> Poll<std::result::Result<(), std::io::Error>> {
633        let mut_self = &mut *self;
634        loop {
635            match &mut mut_self.state {
636                LocalWriteState::Writing(state) => {
637                    if Pin::new(&mut state.writer).poll_shutdown(cx).is_pending() {
638                        return Poll::Pending;
639                    }
640
641                    // Write is complete, we can transition to persisting.
642                    let LocalWriteState::Writing(state) =
643                        std::mem::replace(&mut mut_self.state, LocalWriteState::Poisoned)
644                    else {
645                        unreachable!()
646                    };
647                    let size = state.cursor;
648                    mut_self.state = LocalWriteState::Finishing {
649                        size,
650                        future: Box::pin(Self::persist(
651                            state.temp_path,
652                            mut_self.path.clone(),
653                            size,
654                            state.io_tracker,
655                        )),
656                    };
657                }
658                LocalWriteState::Finishing { future, .. } => match future.poll_unpin(cx) {
659                    Poll::Ready(Ok(result)) => mut_self.state = LocalWriteState::Done(result),
660                    Poll::Ready(Err(e)) => {
661                        return Poll::Ready(Err(io::Error::other(e)));
662                    }
663                    Poll::Pending => return Poll::Pending,
664                },
665                LocalWriteState::Done(_) => return Poll::Ready(Ok(())),
666                LocalWriteState::Poisoned => {
667                    return Poll::Ready(Err(Self::poisoned_err(&self.path)));
668                }
669            }
670        }
671    }
672}
673
674#[async_trait]
675impl Writer for LocalWriter {
676    async fn tell(&mut self) -> Result<usize> {
677        match &mut self.state {
678            LocalWriteState::Writing(state) => Ok(state.cursor),
679            LocalWriteState::Finishing { size, .. } => Ok(*size),
680            LocalWriteState::Done(result) => Ok(result.size),
681            LocalWriteState::Poisoned => Err(Self::poisoned_err(&self.path).into()),
682        }
683    }
684
685    async fn shutdown(&mut self) -> Result<WriteResult> {
686        AsyncWriteExt::shutdown(self).await.map_err(|e| {
687            Error::io(format!(
688                "failed to shutdown local writer for {}: {}",
689                self.path, e
690            ))
691        })?;
692
693        match &self.state {
694            LocalWriteState::Done(result) => Ok(result.clone()),
695            _ => unreachable!(),
696        }
697    }
698}
699
700// Based on object store's implementation.
701pub fn get_etag(metadata: &std::fs::Metadata) -> String {
702    let inode = get_inode(metadata);
703    let size = metadata.len();
704    let mtime = metadata
705        .modified()
706        .ok()
707        .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
708        .unwrap_or_default()
709        .as_micros();
710
711    // Use an ETag scheme based on that used by many popular HTTP servers
712    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
713    format!("{inode:x}-{mtime:x}-{size:x}")
714}
715
716#[cfg(unix)]
717fn get_inode(metadata: &std::fs::Metadata) -> u64 {
718    std::os::unix::fs::MetadataExt::ino(metadata)
719}
720
721#[cfg(not(unix))]
722fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
723    0
724}
725
726#[cfg(test)]
727mod tests {
728    use tokio::io::AsyncWriteExt;
729
730    use super::*;
731
732    #[tokio::test]
733    async fn test_write() {
734        let store = LanceObjectStore::memory();
735
736        let mut object_writer = ObjectWriter::new(&store, &Path::from("/foo"))
737            .await
738            .unwrap();
739        assert_eq!(object_writer.tell().await.unwrap(), 0);
740
741        let buf = vec![0; 256];
742        assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256);
743        assert_eq!(object_writer.tell().await.unwrap(), 256);
744
745        assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256);
746        assert_eq!(object_writer.tell().await.unwrap(), 512);
747
748        assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256);
749        assert_eq!(object_writer.tell().await.unwrap(), 256 * 3);
750
751        let res = Writer::shutdown(&mut object_writer).await.unwrap();
752        assert_eq!(res.size, 256 * 3);
753
754        // Trigger multi part upload
755        let mut object_writer = ObjectWriter::new(&store, &Path::from("/bar"))
756            .await
757            .unwrap();
758        let buf = vec![0; INITIAL_UPLOAD_STEP / 3 * 2];
759        for i in 0..5 {
760            // Write more data to trigger the multipart upload
761            // This should be enough to trigger a multipart upload
762            object_writer.write_all(buf.as_slice()).await.unwrap();
763            // Check the cursor
764            assert_eq!(object_writer.tell().await.unwrap(), (i + 1) * buf.len());
765        }
766        let res = Writer::shutdown(&mut object_writer).await.unwrap();
767        assert_eq!(res.size, buf.len() * 5);
768    }
769
770    #[tokio::test]
771    async fn test_abort_write() {
772        let store = LanceObjectStore::memory();
773
774        let mut object_writer = ObjectWriter::new(&store, &Path::from("/foo"))
775            .await
776            .unwrap();
777        object_writer.abort().await;
778    }
779
780    #[tokio::test]
781    async fn test_local_writer_shutdown() {
782        let tmp = lance_core::utils::tempfile::TempStdDir::default();
783        let file_path = tmp.join("test_local_writer.bin");
784        let os_path = Path::from_absolute_path(&file_path).unwrap();
785        let io_tracker = Arc::new(IOTracker::default());
786
787        let named_temp = tempfile::NamedTempFile::new_in(&*tmp).unwrap();
788        let temp_file_path = named_temp.path().to_owned();
789        let (std_file, temp_path) = named_temp.into_parts();
790        let file = tokio::fs::File::from_std(std_file);
791        let mut writer = LocalWriter::new(file, os_path, temp_path, io_tracker.clone());
792
793        let data = b"hello local writer";
794        writer.write_all(data).await.unwrap();
795
796        // Before shutdown, the final path should not exist
797        assert!(!file_path.exists());
798        // But the temp file should exist
799        assert!(temp_file_path.exists());
800
801        let result = Writer::shutdown(&mut writer).await.unwrap();
802        assert_eq!(result.size, data.len());
803        assert!(result.e_tag.is_some());
804        assert!(!result.e_tag.as_ref().unwrap().is_empty());
805
806        // After shutdown, the final path should exist and temp should be gone
807        assert!(file_path.exists());
808        assert!(!temp_file_path.exists());
809
810        let stats = io_tracker.stats();
811        assert_eq!(stats.write_iops, 1);
812        assert_eq!(stats.written_bytes, data.len() as u64);
813    }
814
815    #[tokio::test]
816    async fn test_local_writer_drop_cleans_up() {
817        let tmp = lance_core::utils::tempfile::TempStdDir::default();
818        let file_path = tmp.join("test_drop.bin");
819        let os_path = Path::from_absolute_path(&file_path).unwrap();
820        let io_tracker = Arc::new(IOTracker::default());
821
822        let named_temp = tempfile::NamedTempFile::new_in(&*tmp).unwrap();
823        let temp_file_path = named_temp.path().to_owned();
824        let (std_file, temp_path) = named_temp.into_parts();
825        let file = tokio::fs::File::from_std(std_file);
826        let mut writer = LocalWriter::new(file, os_path, temp_path, io_tracker);
827
828        writer.write_all(b"some data").await.unwrap();
829        assert!(temp_file_path.exists());
830
831        // Drop without shutdown should clean up the temp file
832        drop(writer);
833        assert!(!temp_file_path.exists());
834        assert!(!file_path.exists());
835    }
836
837    #[test]
838    fn clamp_initial_upload_size_below_min_is_clamped_up() {
839        assert_eq!(clamp_initial_upload_size(0), (INITIAL_UPLOAD_STEP, true));
840        assert_eq!(
841            clamp_initial_upload_size(INITIAL_UPLOAD_STEP - 1),
842            (INITIAL_UPLOAD_STEP, true)
843        );
844    }
845
846    #[test]
847    fn clamp_initial_upload_size_within_range_is_unchanged() {
848        assert_eq!(
849            clamp_initial_upload_size(INITIAL_UPLOAD_STEP),
850            (INITIAL_UPLOAD_STEP, false)
851        );
852        assert_eq!(
853            clamp_initial_upload_size(MAX_UPLOAD_PART_SIZE),
854            (MAX_UPLOAD_PART_SIZE, false)
855        );
856        let mid = INITIAL_UPLOAD_STEP * 8; // 40MB, in range
857        assert_eq!(clamp_initial_upload_size(mid), (mid, false));
858    }
859
860    #[test]
861    fn clamp_initial_upload_size_above_max_is_clamped_down() {
862        assert_eq!(
863            clamp_initial_upload_size(MAX_UPLOAD_PART_SIZE + 1),
864            (MAX_UPLOAD_PART_SIZE, true)
865        );
866        assert_eq!(
867            clamp_initial_upload_size(usize::MAX),
868            (MAX_UPLOAD_PART_SIZE, true)
869        );
870    }
871}