1use std::io;
5use std::pin::Pin;
6use std::sync::{Arc, OnceLock};
7use std::task::Poll;
8
9use crate::object_store::ObjectStore as LanceObjectStore;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::FutureExt;
13use futures::future::BoxFuture;
14use object_store::MultipartUpload;
15use object_store::{Error as OSError, ObjectStore, Result as OSResult, path::Path};
16use rand::Rng;
17use tokio::io::{AsyncWrite, AsyncWriteExt};
18use tokio::task::JoinSet;
19
20use lance_core::{Error, Result};
21use tracing::Instrument;
22
23use crate::traits::Writer;
24use crate::utils::tracking_store::IOTracker;
25use tokio::runtime::Handle;
26
27const INITIAL_UPLOAD_STEP: usize = 1024 * 1024 * 5;
29
30fn max_upload_parallelism() -> usize {
31 static MAX_UPLOAD_PARALLELISM: OnceLock<usize> = OnceLock::new();
32 *MAX_UPLOAD_PARALLELISM.get_or_init(|| {
33 std::env::var("LANCE_UPLOAD_CONCURRENCY")
34 .ok()
35 .and_then(|s| s.parse::<usize>().ok())
36 .unwrap_or(10)
37 })
38}
39
40fn max_conn_reset_retries() -> u16 {
41 static MAX_CONN_RESET_RETRIES: OnceLock<u16> = OnceLock::new();
42 *MAX_CONN_RESET_RETRIES.get_or_init(|| {
43 std::env::var("LANCE_CONN_RESET_RETRIES")
44 .ok()
45 .and_then(|s| s.parse::<u16>().ok())
46 .unwrap_or(20)
47 })
48}
49
50const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5;
52
53fn clamp_initial_upload_size(raw: usize) -> (usize, bool) {
56 let clamped = raw.clamp(INITIAL_UPLOAD_STEP, MAX_UPLOAD_PART_SIZE);
57 (clamped, clamped != raw)
58}
59
60fn initial_upload_size() -> usize {
61 static LANCE_INITIAL_UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
62 *LANCE_INITIAL_UPLOAD_SIZE.get_or_init(|| {
63 let Some(raw) = std::env::var("LANCE_INITIAL_UPLOAD_SIZE")
64 .ok()
65 .and_then(|s| s.parse::<usize>().ok())
66 else {
67 return INITIAL_UPLOAD_STEP;
68 };
69 let (clamped, was_clamped) = clamp_initial_upload_size(raw);
70 if was_clamped {
71 tracing::warn!(
73 requested = raw,
74 clamped,
75 "LANCE_INITIAL_UPLOAD_SIZE must be between 5MB and 5GB; clamping to valid range"
76 );
77 }
78 clamped
79 })
80}
81
82pub struct ObjectWriter {
90 state: UploadState,
91 path: Arc<Path>,
92 cursor: usize,
93 connection_resets: u16,
94 buffer: Vec<u8>,
95 use_constant_size_upload_parts: bool,
97}
98
99#[derive(Debug, Clone, Default)]
100pub struct WriteResult {
101 pub size: usize,
102 pub e_tag: Option<String>,
103}
104
105enum UploadState {
106 Started(Arc<dyn ObjectStore>),
109 CreatingUpload(BoxFuture<'static, OSResult<Box<dyn MultipartUpload>>>),
111 InProgress {
113 part_idx: u16,
114 upload: Box<dyn MultipartUpload>,
115 futures: JoinSet<std::result::Result<(), UploadPutError>>,
116 },
117 PuttingSingle(BoxFuture<'static, OSResult<WriteResult>>),
120 Completing(BoxFuture<'static, OSResult<WriteResult>>),
122 Done(WriteResult),
124}
125
126impl UploadState {
128 fn started_to_putting_single(&mut self, path: Arc<Path>, buffer: Vec<u8>) {
129 let this = std::mem::replace(self, Self::Done(WriteResult::default()));
131 *self = match this {
132 Self::Started(store) => {
133 let fut = async move {
134 let size = buffer.len();
135 let res = store.put(&path, buffer.into()).await?;
136 Ok(WriteResult {
137 size,
138 e_tag: res.e_tag,
139 })
140 };
141 Self::PuttingSingle(Box::pin(fut))
142 }
143 _ => unreachable!(),
144 }
145 }
146
147 fn in_progress_to_completing(&mut self) {
148 let this = std::mem::replace(self, Self::Done(WriteResult::default()));
150 *self = match this {
151 Self::InProgress {
152 mut upload,
153 futures,
154 ..
155 } => {
156 debug_assert!(futures.is_empty());
157 let fut = async move {
158 let res = upload.complete().await?;
159 Ok(WriteResult {
160 size: 0, e_tag: res.e_tag,
162 })
163 };
164 Self::Completing(Box::pin(fut))
165 }
166 _ => unreachable!(),
167 };
168 }
169}
170
171impl ObjectWriter {
172 pub async fn new(object_store: &LanceObjectStore, path: &Path) -> Result<Self> {
173 Ok(Self {
174 state: UploadState::Started(object_store.inner.clone()),
175 cursor: 0,
176 path: Arc::new(path.clone()),
177 connection_resets: 0,
178 buffer: Vec::with_capacity(initial_upload_size()),
179 use_constant_size_upload_parts: object_store.use_constant_size_upload_parts,
180 })
181 }
182
183 fn next_part_buffer(buffer: &mut Vec<u8>, part_idx: u16, constant_upload_size: bool) -> Bytes {
186 let new_capacity = if constant_upload_size {
187 initial_upload_size()
189 } else {
190 initial_upload_size().max(((part_idx / 100) as usize + 1) * INITIAL_UPLOAD_STEP)
192 };
193 let new_buffer = Vec::with_capacity(new_capacity);
194 let part = std::mem::replace(buffer, new_buffer);
195 Bytes::from(part)
196 }
197
198 fn put_part(
199 upload: &mut dyn MultipartUpload,
200 buffer: Bytes,
201 part_idx: u16,
202 sleep: Option<std::time::Duration>,
203 ) -> BoxFuture<'static, std::result::Result<(), UploadPutError>> {
204 log::debug!(
205 "MultipartUpload submitting part with {} bytes",
206 buffer.len()
207 );
208 let fut = upload.put_part(buffer.clone().into());
209 Box::pin(async move {
210 if let Some(sleep) = sleep {
211 tokio::time::sleep(sleep).await;
212 }
213 fut.await.map_err(|source| UploadPutError {
214 part_idx,
215 buffer,
216 source,
217 })?;
218 Ok(())
219 })
220 }
221
222 fn poll_tasks(
223 mut self: Pin<&mut Self>,
224 cx: &mut std::task::Context<'_>,
225 ) -> std::result::Result<(), io::Error> {
226 let mut_self = &mut *self;
227 loop {
228 match &mut mut_self.state {
229 UploadState::Started(_) | UploadState::Done(_) => break,
230 UploadState::CreatingUpload(fut) => match fut.poll_unpin(cx) {
231 Poll::Ready(Ok(mut upload)) => {
232 let mut futures = JoinSet::new();
233
234 let data = Self::next_part_buffer(
235 &mut mut_self.buffer,
236 0,
237 mut_self.use_constant_size_upload_parts,
238 );
239 futures.spawn(Self::put_part(upload.as_mut(), data, 0, None));
240
241 mut_self.state = UploadState::InProgress {
242 part_idx: 1, futures,
244 upload,
245 };
246 }
247 Poll::Ready(Err(e)) => return Err(std::io::Error::other(e)),
248 Poll::Pending => break,
249 },
250 UploadState::InProgress {
251 upload, futures, ..
252 } => {
253 while let Poll::Ready(Some(res)) = futures.poll_join_next(cx) {
254 match res {
255 Ok(Ok(())) => {}
256 Err(err) => return Err(std::io::Error::other(err)),
257 Ok(Err(UploadPutError {
258 source: OSError::Generic { source, .. },
259 part_idx,
260 buffer,
261 })) if source
262 .to_string()
263 .to_lowercase()
264 .contains("connection reset by peer") =>
265 {
266 if mut_self.connection_resets < max_conn_reset_retries() {
267 mut_self.connection_resets += 1;
269
270 let sleep_time_ms = rand::rng().random_range(2_000..8_000);
272 let sleep_time =
273 std::time::Duration::from_millis(sleep_time_ms);
274
275 futures.spawn(Self::put_part(
276 upload.as_mut(),
277 buffer,
278 part_idx,
279 Some(sleep_time),
280 ));
281 } else {
282 return Err(io::Error::new(
283 io::ErrorKind::ConnectionReset,
284 Box::new(ConnectionResetError {
285 message: format!(
286 "Hit max retries ({}) for connection reset",
287 max_conn_reset_retries()
288 ),
289 source,
290 }),
291 ));
292 }
293 }
294 Ok(Err(err)) => return Err(err.source.into()),
295 }
296 }
297 break;
298 }
299 UploadState::PuttingSingle(fut) | UploadState::Completing(fut) => {
300 match fut.poll_unpin(cx) {
301 Poll::Ready(Ok(mut res)) => {
302 res.size = mut_self.cursor;
303 mut_self.state = UploadState::Done(res)
304 }
305 Poll::Ready(Err(e)) => return Err(std::io::Error::other(e)),
306 Poll::Pending => break,
307 }
308 }
309 }
310 }
311 Ok(())
312 }
313
314 pub async fn abort(&mut self) {
315 let state = std::mem::replace(&mut self.state, UploadState::Done(WriteResult::default()));
316 if let UploadState::InProgress { mut upload, .. } = state {
317 let _ = upload.abort().await;
318 }
319 }
320}
321
322impl Drop for ObjectWriter {
323 fn drop(&mut self) {
324 if matches!(self.state, UploadState::InProgress { .. }) {
326 let state =
328 std::mem::replace(&mut self.state, UploadState::Done(WriteResult::default()));
329 if let UploadState::InProgress { mut upload, .. } = state
330 && let Ok(handle) = Handle::try_current()
331 {
332 handle.spawn(async move {
333 let _ = upload.abort().await;
334 });
335 }
336 }
337 }
338}
339
340struct UploadPutError {
344 part_idx: u16,
345 buffer: Bytes,
346 source: OSError,
347}
348
349#[derive(Debug)]
350struct ConnectionResetError {
351 message: String,
352 source: Box<dyn std::error::Error + Send + Sync>,
353}
354
355impl std::error::Error for ConnectionResetError {}
356
357impl std::fmt::Display for ConnectionResetError {
358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359 write!(f, "{}: {}", self.message, self.source)
360 }
361}
362
363impl AsyncWrite for ObjectWriter {
364 fn poll_write(
365 mut self: std::pin::Pin<&mut Self>,
366 cx: &mut std::task::Context<'_>,
367 buf: &[u8],
368 ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
369 self.as_mut().poll_tasks(cx)?;
370
371 let remaining_capacity = self.buffer.capacity() - self.buffer.len();
373 let bytes_to_write = std::cmp::min(remaining_capacity, buf.len());
374 self.buffer.extend_from_slice(&buf[..bytes_to_write]);
375 self.cursor += bytes_to_write;
376
377 let mut_self = &mut *self;
380
381 if mut_self.buffer.capacity() == mut_self.buffer.len() {
383 match &mut mut_self.state {
384 UploadState::Started(store) => {
385 let path = mut_self.path.clone();
386 let store = store.clone();
387 let fut = Box::pin(async move { store.put_multipart(path.as_ref()).await });
388 self.state = UploadState::CreatingUpload(fut);
389 }
390 UploadState::InProgress {
391 upload,
392 part_idx,
393 futures,
394 ..
395 } => {
396 if futures.len() < max_upload_parallelism() {
398 let data = Self::next_part_buffer(
399 &mut mut_self.buffer,
400 *part_idx,
401 mut_self.use_constant_size_upload_parts,
402 );
403 futures.spawn(
404 Self::put_part(upload.as_mut(), data, *part_idx, None)
405 .instrument(tracing::Span::current()),
406 );
407 *part_idx += 1;
408 }
409 }
410 _ => {}
411 }
412 }
413
414 self.poll_tasks(cx)?;
415
416 match bytes_to_write {
417 0 => Poll::Pending,
418 _ => Poll::Ready(Ok(bytes_to_write)),
419 }
420 }
421
422 fn poll_flush(
423 mut self: std::pin::Pin<&mut Self>,
424 cx: &mut std::task::Context<'_>,
425 ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
426 self.as_mut().poll_tasks(cx)?;
427
428 match &self.state {
429 UploadState::Started(_) | UploadState::Done(_) => Poll::Ready(Ok(())),
430 UploadState::CreatingUpload(_)
431 | UploadState::Completing(_)
432 | UploadState::PuttingSingle(_) => Poll::Pending,
433 UploadState::InProgress { futures, .. } => {
434 if futures.is_empty() {
435 Poll::Ready(Ok(()))
436 } else {
437 Poll::Pending
438 }
439 }
440 }
441 }
442
443 fn poll_shutdown(
444 mut self: std::pin::Pin<&mut Self>,
445 cx: &mut std::task::Context<'_>,
446 ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
447 loop {
448 self.as_mut().poll_tasks(cx)?;
449
450 let mut_self = &mut *self;
453 match &mut mut_self.state {
454 UploadState::Done(_) => return Poll::Ready(Ok(())),
455 UploadState::CreatingUpload(_)
456 | UploadState::PuttingSingle(_)
457 | UploadState::Completing(_) => return Poll::Pending,
458 UploadState::Started(_) => {
459 let part = std::mem::take(&mut mut_self.buffer);
461 let path = mut_self.path.clone();
462 self.state.started_to_putting_single(path, part);
463 }
464 UploadState::InProgress {
465 upload,
466 futures,
467 part_idx,
468 } => {
469 if !mut_self.buffer.is_empty() && futures.len() < max_upload_parallelism() {
471 let data = Bytes::from(std::mem::take(&mut mut_self.buffer));
473 futures.spawn(
474 Self::put_part(upload.as_mut(), data, *part_idx, None)
475 .instrument(tracing::Span::current()),
476 );
477 continue;
480 }
481
482 if futures.is_empty() {
484 self.state.in_progress_to_completing();
485 } else {
486 return Poll::Pending;
487 }
488 }
489 }
490 }
491 }
492}
493
494#[async_trait]
495impl Writer for ObjectWriter {
496 async fn tell(&mut self) -> Result<usize> {
497 Ok(self.cursor)
498 }
499
500 async fn shutdown(&mut self) -> Result<WriteResult> {
501 AsyncWriteExt::shutdown(self).await.map_err(|e| {
502 Error::io(format!(
503 "failed to shutdown object writer for {}: {}",
504 self.path, e
505 ))
506 })?;
507 if let UploadState::Done(result) = &self.state {
508 Ok(result.clone())
509 } else {
510 unreachable!()
511 }
512 }
513}
514
515pub struct LocalWriter {
516 path: Path,
517 state: LocalWriteState,
518}
519
520#[derive(Default)]
521enum LocalWriteState {
522 Writing(WritingState),
523 Finishing {
524 size: usize,
525 future: BoxFuture<'static, Result<WriteResult>>,
526 },
527 Done(WriteResult),
528 #[default]
529 Poisoned,
530}
531
532struct WritingState {
533 writer: tokio::io::BufWriter<tokio::fs::File>,
534 cursor: usize,
535 temp_path: tempfile::TempPath,
537 io_tracker: Arc<IOTracker>,
538}
539
540impl LocalWriter {
541 pub fn new(
542 file: tokio::fs::File,
543 path: Path,
544 temp_path: tempfile::TempPath,
545 io_tracker: Arc<IOTracker>,
546 ) -> Self {
547 Self {
548 path,
549 state: LocalWriteState::Writing(WritingState {
550 writer: tokio::io::BufWriter::new(file),
551 cursor: 0,
552 temp_path,
553 io_tracker,
554 }),
555 }
556 }
557
558 fn already_closed_err(path: &Path) -> io::Error {
559 io::Error::other(format!(
560 "cannot write to LocalWriter for {} after shutdown",
561 path
562 ))
563 }
564
565 fn poisoned_err(path: &Path) -> io::Error {
566 io::Error::other(format!("LocalWriter for {} is in poisoned state", path))
567 }
568
569 async fn persist(
570 temp_path: tempfile::TempPath,
571 final_path: Path,
572 size: usize,
573 io_tracker: Arc<IOTracker>,
574 ) -> Result<WriteResult> {
575 let local_path = crate::local::to_local_path(&final_path);
576 let e_tag = tokio::task::spawn_blocking(move || -> Result<String> {
577 temp_path.persist(&local_path).map_err(|e| {
578 Error::io(format!(
579 "failed to persist temp file to {}: {}",
580 local_path, e.error
581 ))
582 })?;
583
584 let metadata = std::fs::metadata(&local_path).map_err(|e| {
585 Error::io(format!("failed to read metadata for {}: {}", local_path, e))
586 })?;
587 Ok(get_etag(&metadata))
588 })
589 .await
590 .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
591
592 io_tracker.record_write("put", final_path, size as u64);
593
594 Ok(WriteResult {
595 size,
596 e_tag: Some(e_tag),
597 })
598 }
599}
600
601impl AsyncWrite for LocalWriter {
602 fn poll_write(
603 mut self: Pin<&mut Self>,
604 cx: &mut std::task::Context<'_>,
605 buf: &[u8],
606 ) -> Poll<std::result::Result<usize, std::io::Error>> {
607 if let LocalWriteState::Writing(state) = &mut self.state {
608 let poll = Pin::new(&mut state.writer).poll_write(cx, buf);
609 if let Poll::Ready(Ok(n)) = &poll {
610 state.cursor += *n;
611 }
612 poll
613 } else {
614 Poll::Ready(Err(Self::already_closed_err(&self.path)))
615 }
616 }
617
618 fn poll_flush(
619 mut self: Pin<&mut Self>,
620 cx: &mut std::task::Context<'_>,
621 ) -> Poll<std::result::Result<(), std::io::Error>> {
622 if let LocalWriteState::Writing(state) = &mut self.state {
623 Pin::new(&mut state.writer).poll_flush(cx)
624 } else {
625 Poll::Ready(Err(Self::already_closed_err(&self.path)))
626 }
627 }
628
629 fn poll_shutdown(
630 mut self: Pin<&mut Self>,
631 cx: &mut std::task::Context<'_>,
632 ) -> Poll<std::result::Result<(), std::io::Error>> {
633 let mut_self = &mut *self;
634 loop {
635 match &mut mut_self.state {
636 LocalWriteState::Writing(state) => {
637 if Pin::new(&mut state.writer).poll_shutdown(cx).is_pending() {
638 return Poll::Pending;
639 }
640
641 let LocalWriteState::Writing(state) =
643 std::mem::replace(&mut mut_self.state, LocalWriteState::Poisoned)
644 else {
645 unreachable!()
646 };
647 let size = state.cursor;
648 mut_self.state = LocalWriteState::Finishing {
649 size,
650 future: Box::pin(Self::persist(
651 state.temp_path,
652 mut_self.path.clone(),
653 size,
654 state.io_tracker,
655 )),
656 };
657 }
658 LocalWriteState::Finishing { future, .. } => match future.poll_unpin(cx) {
659 Poll::Ready(Ok(result)) => mut_self.state = LocalWriteState::Done(result),
660 Poll::Ready(Err(e)) => {
661 return Poll::Ready(Err(io::Error::other(e)));
662 }
663 Poll::Pending => return Poll::Pending,
664 },
665 LocalWriteState::Done(_) => return Poll::Ready(Ok(())),
666 LocalWriteState::Poisoned => {
667 return Poll::Ready(Err(Self::poisoned_err(&self.path)));
668 }
669 }
670 }
671 }
672}
673
674#[async_trait]
675impl Writer for LocalWriter {
676 async fn tell(&mut self) -> Result<usize> {
677 match &mut self.state {
678 LocalWriteState::Writing(state) => Ok(state.cursor),
679 LocalWriteState::Finishing { size, .. } => Ok(*size),
680 LocalWriteState::Done(result) => Ok(result.size),
681 LocalWriteState::Poisoned => Err(Self::poisoned_err(&self.path).into()),
682 }
683 }
684
685 async fn shutdown(&mut self) -> Result<WriteResult> {
686 AsyncWriteExt::shutdown(self).await.map_err(|e| {
687 Error::io(format!(
688 "failed to shutdown local writer for {}: {}",
689 self.path, e
690 ))
691 })?;
692
693 match &self.state {
694 LocalWriteState::Done(result) => Ok(result.clone()),
695 _ => unreachable!(),
696 }
697 }
698}
699
700pub fn get_etag(metadata: &std::fs::Metadata) -> String {
702 let inode = get_inode(metadata);
703 let size = metadata.len();
704 let mtime = metadata
705 .modified()
706 .ok()
707 .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
708 .unwrap_or_default()
709 .as_micros();
710
711 format!("{inode:x}-{mtime:x}-{size:x}")
714}
715
716#[cfg(unix)]
717fn get_inode(metadata: &std::fs::Metadata) -> u64 {
718 std::os::unix::fs::MetadataExt::ino(metadata)
719}
720
721#[cfg(not(unix))]
722fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
723 0
724}
725
726#[cfg(test)]
727mod tests {
728 use tokio::io::AsyncWriteExt;
729
730 use super::*;
731
732 #[tokio::test]
733 async fn test_write() {
734 let store = LanceObjectStore::memory();
735
736 let mut object_writer = ObjectWriter::new(&store, &Path::from("/foo"))
737 .await
738 .unwrap();
739 assert_eq!(object_writer.tell().await.unwrap(), 0);
740
741 let buf = vec![0; 256];
742 assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256);
743 assert_eq!(object_writer.tell().await.unwrap(), 256);
744
745 assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256);
746 assert_eq!(object_writer.tell().await.unwrap(), 512);
747
748 assert_eq!(object_writer.write(buf.as_slice()).await.unwrap(), 256);
749 assert_eq!(object_writer.tell().await.unwrap(), 256 * 3);
750
751 let res = Writer::shutdown(&mut object_writer).await.unwrap();
752 assert_eq!(res.size, 256 * 3);
753
754 let mut object_writer = ObjectWriter::new(&store, &Path::from("/bar"))
756 .await
757 .unwrap();
758 let buf = vec![0; INITIAL_UPLOAD_STEP / 3 * 2];
759 for i in 0..5 {
760 object_writer.write_all(buf.as_slice()).await.unwrap();
763 assert_eq!(object_writer.tell().await.unwrap(), (i + 1) * buf.len());
765 }
766 let res = Writer::shutdown(&mut object_writer).await.unwrap();
767 assert_eq!(res.size, buf.len() * 5);
768 }
769
770 #[tokio::test]
771 async fn test_abort_write() {
772 let store = LanceObjectStore::memory();
773
774 let mut object_writer = ObjectWriter::new(&store, &Path::from("/foo"))
775 .await
776 .unwrap();
777 object_writer.abort().await;
778 }
779
780 #[tokio::test]
781 async fn test_local_writer_shutdown() {
782 let tmp = lance_core::utils::tempfile::TempStdDir::default();
783 let file_path = tmp.join("test_local_writer.bin");
784 let os_path = Path::from_absolute_path(&file_path).unwrap();
785 let io_tracker = Arc::new(IOTracker::default());
786
787 let named_temp = tempfile::NamedTempFile::new_in(&*tmp).unwrap();
788 let temp_file_path = named_temp.path().to_owned();
789 let (std_file, temp_path) = named_temp.into_parts();
790 let file = tokio::fs::File::from_std(std_file);
791 let mut writer = LocalWriter::new(file, os_path, temp_path, io_tracker.clone());
792
793 let data = b"hello local writer";
794 writer.write_all(data).await.unwrap();
795
796 assert!(!file_path.exists());
798 assert!(temp_file_path.exists());
800
801 let result = Writer::shutdown(&mut writer).await.unwrap();
802 assert_eq!(result.size, data.len());
803 assert!(result.e_tag.is_some());
804 assert!(!result.e_tag.as_ref().unwrap().is_empty());
805
806 assert!(file_path.exists());
808 assert!(!temp_file_path.exists());
809
810 let stats = io_tracker.stats();
811 assert_eq!(stats.write_iops, 1);
812 assert_eq!(stats.written_bytes, data.len() as u64);
813 }
814
815 #[tokio::test]
816 async fn test_local_writer_drop_cleans_up() {
817 let tmp = lance_core::utils::tempfile::TempStdDir::default();
818 let file_path = tmp.join("test_drop.bin");
819 let os_path = Path::from_absolute_path(&file_path).unwrap();
820 let io_tracker = Arc::new(IOTracker::default());
821
822 let named_temp = tempfile::NamedTempFile::new_in(&*tmp).unwrap();
823 let temp_file_path = named_temp.path().to_owned();
824 let (std_file, temp_path) = named_temp.into_parts();
825 let file = tokio::fs::File::from_std(std_file);
826 let mut writer = LocalWriter::new(file, os_path, temp_path, io_tracker);
827
828 writer.write_all(b"some data").await.unwrap();
829 assert!(temp_file_path.exists());
830
831 drop(writer);
833 assert!(!temp_file_path.exists());
834 assert!(!file_path.exists());
835 }
836
837 #[test]
838 fn clamp_initial_upload_size_below_min_is_clamped_up() {
839 assert_eq!(clamp_initial_upload_size(0), (INITIAL_UPLOAD_STEP, true));
840 assert_eq!(
841 clamp_initial_upload_size(INITIAL_UPLOAD_STEP - 1),
842 (INITIAL_UPLOAD_STEP, true)
843 );
844 }
845
846 #[test]
847 fn clamp_initial_upload_size_within_range_is_unchanged() {
848 assert_eq!(
849 clamp_initial_upload_size(INITIAL_UPLOAD_STEP),
850 (INITIAL_UPLOAD_STEP, false)
851 );
852 assert_eq!(
853 clamp_initial_upload_size(MAX_UPLOAD_PART_SIZE),
854 (MAX_UPLOAD_PART_SIZE, false)
855 );
856 let mid = INITIAL_UPLOAD_STEP * 8; assert_eq!(clamp_initial_upload_size(mid), (mid, false));
858 }
859
860 #[test]
861 fn clamp_initial_upload_size_above_max_is_clamped_down() {
862 assert_eq!(
863 clamp_initial_upload_size(MAX_UPLOAD_PART_SIZE + 1),
864 (MAX_UPLOAD_PART_SIZE, true)
865 );
866 assert_eq!(
867 clamp_initial_upload_size(usize::MAX),
868 (MAX_UPLOAD_PART_SIZE, true)
869 );
870 }
871}