opendal-core 0.57.0

Apache OpenDALâ„¢: One Layer, All Storage.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
- Proposal Name: `copier_api`
- Start Date: 2026-05-12
- RFC PR: [apache/opendal#7519]https://github.com/apache/opendal/pull/7519
- Tracking Issue: [apache/opendal#7521]https://github.com/apache/opendal/issues/7521

# Summary

Introduce a `Copier` API for long running copy operations.

`Operator::copy` will remain the simple run-to-completion API. `Operator::copier`
will create a stateful copy stream that users can drive step by step, observe
byte progress, and abort when needed.

# Motivation

OpenDAL currently exposes `copy` as a single future:

```rust
op.copy("from", "to").await?;
```

This works well for small objects and backends with native single-request copy.
It does not model large object copies well. Services have different large-copy
protocols:

- S3-compatible services use multipart copy for objects larger than the
  single-request copy limit.
- GCS uses rewrite tokens and may require multiple rewrite requests.
- Azure Blob can use block-copy style operations and commit the final block
  list.

Those protocols are stateful. They can make progress over many requests, expose
intermediate server-side state, and require explicit cleanup when aborted.

OpenDAL should support those protocols without exposing backend continuation
tokens to users. This matches the existing `Lister` design: pagination tokens
are stored in `PageContext` and are not part of the public API. Users drive the
operation by repeatedly asking for the next item.

# Guide-level explanation

Users who do not need control over the copy process keep using `copy`:

```rust
op.copy("from", "to").await?;
```

For large files or task-style workflows, users can create a `Copier`:

```rust
use futures::TryStreamExt;

let mut copier = op.copier("from", "to").await?;

let mut copied = 0usize;
while let Some(n) = copier.try_next().await? {
    copied += n;
    println!("copied {copied} bytes");
}
```

`Copier` is complete only after it returns `None`. A returned byte count means
the copy has made progress, but it is not a completion signal.

Users can abort an unfinished copy:

```rust
let mut copier = op.copier("from", "to").await?;

if let Err(err) = copier.try_next().await {
    let _ = copier.abort().await;
    return Err(err);
}
```

`abort` is needed because some backends create intermediate server-side state,
such as multipart uploads or uncommitted blocks. OpenDAL cannot perform async
cleanup from `Drop`, so explicit abort remains the user's responsibility when
they stop driving a copier before completion.

`copy_with` and `copier_with` share copy options:

```rust
let mut copier = op
    .copier_with("from", "to")
    .if_not_exists(true)
    .concurrent(8)
    .await?;

while copier.try_next().await?.is_some() {}
```

`if_not_exists` is a semantic option and must preserve the backend's atomic
destination condition. `concurrent` is an execution option for backends that can
split copy into server-side tasks. Backends that cannot copy concurrently can
ignore it.

# Reference-level explanation

## Public API

Add a public `Copier` type:

```rust
pub struct Copier {
    copier: Option<oio::Copier>,
    fut: Option<BoxedStaticFuture<(oio::Copier, Result<Option<usize>>)>>,
    errored: bool,
}
```

`Copier` implements:

```rust
impl Stream for Copier {
    type Item = Result<usize>;
}
```

The stream contract is:

- `Some(Ok(n))`: the copy operation made progress by `n` bytes.
- `Some(Err(err))`: the copy operation failed. Following stream polls return
  `None`.
- `None`: the copy operation is complete and the destination object has been
  committed.

`n` is a best-effort byte count for this step. Backends should return the real
increment when they can. Returning `0` is allowed when a backend can advance its
state without reporting a reliable byte delta.

`Copier` also exposes:

```rust
impl Copier {
    pub async fn abort(&mut self) -> Result<()>;
}
```

`abort` is only valid before the copier has completed or errored.

Add `Operator` APIs:

```rust
impl Operator {
    pub async fn copier(&self, from: &str, to: &str) -> Result<Copier>;

    pub fn copier_with(
        &self,
        from: &str,
        to: &str,
    ) -> FutureCopier<impl Future<Output = Result<Copier>>>;

    pub async fn copier_options(
        &self,
        from: &str,
        to: &str,
        opts: impl Into<options::CopyOptions>,
    ) -> Result<Copier>;
}
```

`Operator::copy` becomes a run-to-completion wrapper:

```rust
pub async fn copy(&self, from: &str, to: &str) -> Result<()> {
    let mut copier = self.copier(from, to).await?;

    loop {
        match copier.try_next().await {
            Ok(Some(_)) => continue,
            Ok(None) => return Ok(()),
            Err(err) => {
                let _ = copier.abort().await;
                return Err(err);
            }
        }
    }
}
```

## Options

`CopyOptions` will contain both semantic and execution options:

```rust
pub struct CopyOptions {
    pub if_not_exists: bool,
    pub concurrent: usize,
}
```

The raw layer splits this into operation arguments and copier execution options:

```rust
pub struct OpCopy {
    if_not_exists: bool,
}

pub struct OpCopier {
    concurrent: usize,
}
```

`if_not_exists` affects destination semantics. `concurrent` affects how many
copy tasks the copier may keep in flight.

## Raw API

Add `oio::Copy`:

```rust
pub type Copier = Box<dyn CopyDyn>;

pub trait Copy: Unpin + Send + Sync {
    fn next(&mut self) -> impl Future<Output = Result<Option<usize>>> + MaybeSend;

    fn abort(&mut self) -> impl Future<Output = Result<()>> + MaybeSend;
}
```

`next` drives the server-side copy state machine. `None` means that the final
commit has completed.

Add a dynamic version:

```rust
pub trait CopyDyn: Unpin + Send + Sync {
    fn next_dyn(&mut self) -> BoxedFuture<'_, Result<Option<usize>>>;

    fn abort_dyn(&mut self) -> BoxedFuture<'_, Result<()>>;
}
```

Extend `Access`:

```rust
trait Access {
    type Copier: oio::Copy;

    fn copy(
        &self,
        from: &str,
        to: &str,
        args: OpCopy,
        opts: OpCopier,
    ) -> impl Future<Output = Result<(RpCopy, Self::Copier)>> + MaybeSend;
}
```

This changes the raw `copy` operation from "perform copy now" to "create a copy
state machine". The public `Operator::copy` preserves the current user-facing
behavior by draining the returned copier to completion.

## Capability

Keep the existing copy capability fields:

```rust
pub copy: bool,
pub copy_with_if_not_exists: bool,
```

Do not add public multipart-copy size constraints. Copy takes an existing source
object as input, so users do not need to plan request part sizes. Backends should
choose request sizes internally based on source size and service-specific
limits.

Backends that only support single-request copy can still expose a `Copier`; it
will be a one-step copier that performs the existing copy operation.

## Helper implementations

Add `oio::OneShotCopier` for existing backends:

```rust
pub trait OneShotCopy: Send + Sync + Unpin + 'static {
    fn copy_once(&self) -> impl Future<Output = Result<Option<usize>>> + MaybeSend;
}
```

`OneShotCopier` calls `copy_once` once, records completion, and returns `None`
after the copy has completed.

Add `oio::MultipartCopier` for S3-compatible services:

```rust
pub trait MultipartCopy: Send + Sync + Unpin + 'static {
    fn initiate_copy(&self) -> impl Future<Output = Result<String>> + MaybeSend;

    fn copy_part(
        &self,
        upload_id: &str,
        part_number: usize,
        range: BytesRange,
    ) -> impl Future<Output = Result<MultipartPart>> + MaybeSend;

    fn complete_copy(
        &self,
        upload_id: &str,
        parts: &[MultipartPart],
    ) -> impl Future<Output = Result<()>> + MaybeSend;

    fn abort_copy(&self, upload_id: &str) -> impl Future<Output = Result<()>> + MaybeSend;
}
```

`MultipartCopier` owns the source identity, destination path, upload id,
completed parts, next range, and concurrent task queue. It uses `OpCopier` to
choose concurrency and uses backend-private limits to choose part size.

`next` should keep the internal task queue full up to `concurrent`, wait for one
copy task to complete, record the completed part, and return the completed byte
count. This makes `Copier` capable of concurrent server-side copy without
exposing multipart details to users.

Other backends can implement `oio::Copy` directly when their protocol does not
fit multipart copy. GCS rewrite is the main example.

## Backend behavior

### S3-compatible services

S3-compatible services should use single-request copy below the service limit
and multipart copy above it.

Large copy flow:

1. Stat the source to get content length and source identity, such as ETag or
   version id when available.
2. Initiate multipart upload for the destination.
3. Fill the internal task queue with byte-range upload-part-copy requests up to
   the configured concurrency.
4. Return `Some(part_size)` when a part has been copied.
5. Complete multipart upload after all parts have been copied.
6. Return `None` after complete succeeds.

If a part fails after retries, `next` returns the error. `Operator::copy`
attempts `abort`; users driving `Copier` manually can call `abort`.

Resume across processes is not part of this RFC. Upload id and completed part
state stay inside the copier object.

### GCS

GCS should use rewrite for large copy.

The copier owns the rewrite token internally. Each `next` call issues one
rewrite request:

- If the response reports more bytes rewritten than the previous response,
  return the delta.
- If the response advances the token but does not provide a reliable delta,
  return `Some(0)`.
- If the response is done, return `None`.

The rewrite token is not exposed to users.

GCS should not use XML multipart upload for copy. OpenDAL's current GCS
multipart writer uploads client-provided request bodies through `UploadPart`.
It does not provide a server-side upload-part-copy operation that copies a byte
range from an existing source object. Using XML multipart upload for copy would
therefore require OpenDAL to read source ranges through the client and upload
them again, which is no longer an original server-side copy.

`concurrent` is best-effort for GCS. The rewrite protocol is token driven and
must be advanced by repeatedly passing the token returned by the previous
request. A GCS copier can ignore `concurrent` unless GCS exposes a compatible
server-side range-copy protocol in the future.

### Azure Blob

Azure Blob can use either service-side async copy or block-from-url style copy.
For a controllable copier, block-from-url plus final block-list commit is a
better fit:

1. Stat the source.
2. Generate block ids internally.
3. Copy each source range into an uncommitted block.
4. Return `Some(block_size)` for each copied block.
5. Commit the block list and return `None`.

If Azure async copy is used for a backend-specific reason, polling progress can
still be hidden inside `next`.

### Local filesystems

Local filesystem copy can use a one-shot copier at first. A future
implementation can copy in chunks if we need progress for very large local
files.

## Conditions and consistency

`if_not_exists` must remain an atomic destination condition. Backends must not
emulate it with `stat(to)` followed by an unconditional copy for multipart copy.
If a backend cannot preserve the condition for its large-copy protocol, it must
return `Unsupported`.

Backends should pin source identity when possible. For multipart copy, copied
parts must come from the same logical source object. If the source object changes
during copy and the backend can detect it, `next` should return
`ConditionNotMatch` or another suitable error instead of completing a mixed
object.

## Error handling

`next` should return temporary errors as temporary, so existing retry layers can
retry them.

After `next` returns an error:

- Public `Copier` marks itself errored and future stream polls return `None`.
- `abort` remains available to clean server-side intermediate state.
- `Operator::copy` attempts `abort` before returning the original error.

`abort` failure must not hide the original `copy` error in `Operator::copy`.

# Drawbacks

- Adds a new public API and a new raw IO trait.
- Changes the internal raw `copy` contract.
- Requires every backend with native copy to return a copier, even when the
  backend only needs a one-shot implementation.
- Progress is best effort. Some services can return `Some(0)` for state
  advancement without reliable byte deltas.

# Rationale and alternatives

## Hide large copy inside `Operator::copy`

We could keep only `Operator::copy` and implement multipart copy or rewrite
internally. This is simple for users, but bad for very large copies. Users cannot
control progress, cancellation, or task scheduling.

`Copier` keeps `copy` simple while allowing advanced users to drive long copies.

## Expose backend continuation tokens

We could expose S3 upload ids, GCS rewrite tokens, or Azure block lists as
checkpoints. This would make cross-process resume possible, but it would leak
backend protocols into the public API.

OpenDAL's `Lister` does not expose backend continuation tokens. It exposes
semantic controls like `start_after` and keeps service tokens inside
`PageContext`. `Copier` should follow the same abstraction boundary.

## Add `close`

`Writer` needs `close` because users provide an open-ended stream of data.
`Copier` does not. The source range is known when the copier is created, and the
backend knows when the copy is complete. Therefore `next` can perform the final
commit and return `None`.

## Return total copied bytes instead of per-step bytes

Returning total bytes would require users to understand whether a value is a
snapshot, a delta, or a backend-reported total. Returning per-step bytes keeps
the stream item simple. Users who need accumulated progress can sum the returned
values.

# Prior art

- OpenDAL `Lister` hides backend pagination tokens inside `PageContext`.
- OpenDAL `Writer` hides multipart upload details and exposes `write`, `close`,
  and `abort`.
- AWS S3 multipart copy splits source objects into copied parts and completes a
  multipart upload.
- GCS rewrite uses a rewrite token internally until the rewrite operation is
  done.
- Azure Blob block operations copy source ranges into blocks and commit a final
  block list.

# Unresolved questions

- Should `Copier` provide an inherent async `next` method, or only implement
  `Stream<Item = Result<usize>>` like `Lister`?
- Should `CopyOptions::concurrent` be added in this RFC, or should the first
  implementation use backend defaults only?
- Should successful `copy` return `Metadata` in the future, following
  `write_returns_metadata`?

# Future possibilities

## Durable resume

This RFC does not expose durable checkpoints. A future RFC can add a separate
task-oriented API if OpenDAL wants cross-process copy resume.

That design should not expose raw backend tokens directly. It should define a
portable checkpoint envelope with source identity, destination identity, backend
scheme, and backend-private payload.

## Copy progress metadata

If users need richer progress, OpenDAL can replace the stream item with a
structured type:

```rust
pub struct CopyProgress {
    pub copied: usize,
    pub total: Option<u64>,
}
```

This RFC starts with `usize` to keep the public surface minimal.

## Directory copier

This RFC only covers file copy. Recursive directory copy can be built later by
combining `Lister`, `Deleter`, and `Copier` with explicit concurrency control.