Skip to main content

zipatch_rs/apply/
driver.rs

1//! Sequential apply driver — bridges the parser [`ZiPatchReader`] with the
2//! apply layer's [`ApplySession`].
3//!
4//! The high-level entry points come in two forms:
5//!
6//! - [`ApplyConfig::apply_patch`] / [`ApplyConfig::resume_apply_patch`] —
7//!   consuming convenience methods that materialise a fresh session, run
8//!   the patch, and drop the session on return.
9//! - [`ApplySession::apply_patch`] / [`ApplySession::resume_apply_patch`] —
10//!   `&mut self` methods for callers driving multiple patches against the
11//!   same session.
12//!
13//! The chunk-loop machinery is private to this module; consumers should never
14//! need to write the dispatch loop by hand.
15
16use crate::apply::{self, ApplyConfig, ApplySession, Checkpoint, SequentialCheckpoint};
17use crate::chunk::{self, ZiPatchReader};
18use crate::{ApplyError, ApplyResult, ParseError};
19use std::io::{Read, Seek};
20use std::ops::ControlFlow;
21
22impl ApplyConfig {
23    /// Iterate every chunk in `reader` and apply each one against a freshly
24    /// materialised [`ApplySession`].
25    ///
26    /// Convenience wrapper around [`ApplyConfig::into_session`] followed by
27    /// [`ApplySession::apply_patch`]; the session is dropped on return.
28    ///
29    /// See [`ApplySession::apply_patch`] for the full error contract.
30    pub fn apply_patch<R: Read>(self, reader: ZiPatchReader<R>) -> ApplyResult<()> {
31        self.into_session().apply_patch(reader)
32    }
33
34    /// Resume a previously interrupted apply against a freshly materialised
35    /// [`ApplySession`].
36    ///
37    /// Convenience wrapper around [`ApplyConfig::into_session`] followed by
38    /// [`ApplySession::resume_apply_patch`]; the session is dropped on return.
39    pub fn resume_apply_patch<R: Read + Seek>(
40        self,
41        reader: ZiPatchReader<R>,
42        from: Option<&SequentialCheckpoint>,
43    ) -> ApplyResult<SequentialCheckpoint> {
44        self.into_session().resume_apply_patch(reader, from)
45    }
46}
47
48impl ApplySession {
49    /// Iterate every chunk in `reader` and apply each one to this session.
50    ///
51    /// This is the primary high-level entry point for applying a patch. It
52    /// drives the [`ZiPatchReader`] to completion, dispatching each
53    /// yielded [`Chunk`](crate::Chunk) through the apply layer in stream order.
54    ///
55    /// Chunks **must** be applied in order — the `ZiPatch` format is a
56    /// sequential log and later chunks may depend on filesystem state produced
57    /// by earlier ones.
58    ///
59    /// # Errors
60    ///
61    /// Stops at the first parse or apply error and returns it immediately.
62    /// Any filesystem changes already applied by earlier chunks are **not**
63    /// rolled back.
64    pub fn apply_patch<R: Read>(&mut self, mut reader: ZiPatchReader<R>) -> ApplyResult<()> {
65        let span = tracing::info_span!(crate::tracing_schema::span_names::APPLY_PATCH);
66        let _enter = span.enter();
67        let started = std::time::Instant::now();
68        self.patch_name = reader.patch_name().map(str::to_owned);
69        self.patch_size = None;
70        let result = run_apply_loop(&mut reader, self, 0);
71        let flush_result = self.flush();
72        let (final_result, chunks_applied) = match (result, flush_result) {
73            (Ok(n), Ok(())) => (Ok(()), n),
74            (Ok(_), Err(e)) => (Err(ApplyError::from(e)), 0),
75            (Err(e), _) => (Err(e), 0),
76        };
77        if final_result.is_ok() {
78            tracing::info!(
79                chunks = chunks_applied,
80                bytes_read = reader.bytes_read(),
81                resumed_from_chunk = tracing::field::Empty,
82                elapsed_ms = started.elapsed().as_millis() as u64,
83                "apply_patch: patch applied"
84            );
85        }
86        final_result
87    }
88
89    /// Resume a previously interrupted apply from a [`SequentialCheckpoint`].
90    ///
91    /// See [`ApplyConfig::resume_apply_patch`] for the conceptual overview
92    /// and the resume contract.
93    ///
94    /// # Errors
95    ///
96    /// Same vocabulary as [`Self::apply_patch`], plus
97    /// [`ApplyError::SchemaVersionMismatch`] when `from.schema_version` does
98    /// not equal [`SequentialCheckpoint::CURRENT_SCHEMA_VERSION`].
99    #[allow(clippy::too_many_lines)]
100    pub fn resume_apply_patch<R: Read + Seek>(
101        &mut self,
102        mut reader: ZiPatchReader<R>,
103        from: Option<&SequentialCheckpoint>,
104    ) -> ApplyResult<SequentialCheckpoint> {
105        let span = tracing::info_span!(crate::tracing_schema::span_names::RESUME_APPLY_PATCH);
106        let _enter = span.enter();
107        let started = std::time::Instant::now();
108
109        if let Some(cp) = from {
110            if !cp
111                .schema_version
112                .compatible_with(SequentialCheckpoint::CURRENT_SCHEMA_VERSION)
113            {
114                return Err(ApplyError::SchemaVersionMismatch {
115                    kind: "sequential-checkpoint",
116                    found: cp.schema_version,
117                    expected: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
118                });
119            }
120        }
121
122        let reader_name = reader.patch_name().map(str::to_owned);
123        let total_size = stream_total_size(&mut reader)?;
124        self.patch_name.clone_from(&reader_name);
125        self.patch_size = Some(total_size);
126
127        let effective_from = from.and_then(|cp| {
128            let name_match = cp.patch_name == reader_name;
129            let size_match = match cp.patch_size {
130                Some(sz) => sz == total_size,
131                None => true,
132            };
133            if name_match && size_match {
134                Some(cp)
135            } else {
136                tracing::warn!(
137                    expected_patch_name = ?reader_name,
138                    expected_patch_size = total_size,
139                    checkpoint_patch_name = ?cp.patch_name,
140                    checkpoint_patch_size = ?cp.patch_size,
141                    "resume_apply_patch: stale checkpoint, restarting from chunk 0"
142                );
143                None
144            }
145        });
146
147        let resumed_from_chunk = effective_from.map(|cp| cp.next_chunk_index);
148        let skipped_bytes_at_start = effective_from.map_or(0, |cp| cp.bytes_read);
149        let has_in_flight = effective_from
150            .and_then(|cp| cp.in_flight.as_ref())
151            .is_some();
152
153        if let Some(cp) = effective_from {
154            tracing::info!(
155                patch_name = ?reader_name,
156                skipped_chunks = cp.next_chunk_index,
157                skipped_bytes = cp.bytes_read,
158                has_in_flight,
159                "resume_apply_patch: resuming patch"
160            );
161            fast_forward(&mut reader, cp.next_chunk_index, cp.bytes_read)?;
162        }
163
164        let start_index = effective_from.map_or(0, |cp| cp.next_chunk_index);
165        let in_flight = effective_from.and_then(|cp| cp.in_flight.clone());
166
167        let result: ApplyResult<u64> = (|| {
168            if let Some(in_flight) = in_flight {
169                resume_in_flight_chunk(&mut reader, self, start_index, &in_flight)?;
170                run_apply_loop(&mut reader, self, start_index + 1).map(|n| n + 1)
171            } else {
172                run_apply_loop(&mut reader, self, start_index)
173            }
174        })();
175
176        let flush_result = self.flush();
177        let (final_result, chunks_applied) = match (result, flush_result) {
178            (Ok(n), Ok(())) => (Ok(()), n),
179            (Ok(_), Err(e)) => (Err(ApplyError::from(e)), 0),
180            (Err(e), _) => (Err(e), 0),
181        };
182
183        match final_result {
184            Ok(()) => {
185                let bytes_read = reader.bytes_read();
186                if let Some(from_chunk) = resumed_from_chunk {
187                    tracing::info!(
188                        chunks = chunks_applied,
189                        bytes_read,
190                        resumed_from_chunk = from_chunk,
191                        skipped_bytes = skipped_bytes_at_start,
192                        elapsed_ms = started.elapsed().as_millis() as u64,
193                        "resume_apply_patch: patch applied"
194                    );
195                } else {
196                    tracing::info!(
197                        chunks = chunks_applied,
198                        bytes_read,
199                        resumed_from_chunk = tracing::field::Empty,
200                        elapsed_ms = started.elapsed().as_millis() as u64,
201                        "resume_apply_patch: patch applied"
202                    );
203                }
204                Ok(SequentialCheckpoint {
205                    schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
206                    next_chunk_index: start_index + chunks_applied,
207                    bytes_read,
208                    patch_name: reader_name,
209                    patch_size: Some(total_size),
210                    in_flight: None,
211                })
212            }
213            Err(e) => Err(e),
214        }
215    }
216}
217
218fn run_apply_loop<R: Read>(
219    reader: &mut ZiPatchReader<R>,
220    session: &mut ApplySession,
221    start_index: u64,
222) -> ApplyResult<u64> {
223    let mut index = start_index;
224    while let Some(rec) = reader.next_chunk()? {
225        session.current_chunk_index = index;
226        session.current_chunk_bytes_read = rec.bytes_read;
227        rec.chunk.apply(session)?;
228        let bytes_read = rec.bytes_read;
229        let next_chunk_index = index + 1;
230        let checkpoint = Checkpoint::Sequential(SequentialCheckpoint {
231            schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
232            next_chunk_index,
233            bytes_read,
234            patch_name: session.patch_name.clone(),
235            patch_size: session.patch_size,
236            in_flight: None,
237        });
238        tracing::debug!(
239            next_chunk_index,
240            bytes_read,
241            in_flight = false,
242            "apply_patch: checkpoint recorded"
243        );
244        session.record_checkpoint(&checkpoint)?;
245        let event = apply::ChunkEvent {
246            index: index as usize,
247            kind: rec.tag,
248            bytes_read,
249        };
250        if let ControlFlow::Break(()) = session.observer_mut().on_chunk_applied(event) {
251            return Err(ApplyError::Cancelled);
252        }
253        if session.cancel_requested() {
254            return Err(ApplyError::Cancelled);
255        }
256        index += 1;
257    }
258    Ok(index - start_index)
259}
260
261fn stream_total_size<R: Read + Seek>(reader: &mut ZiPatchReader<R>) -> ApplyResult<u64> {
262    let inner = reader.inner_mut();
263    let current = inner.stream_position()?;
264    let end = inner.seek(std::io::SeekFrom::End(0))?;
265    inner.seek(std::io::SeekFrom::Start(current))?;
266    Ok(end)
267}
268
269fn fast_forward<R: Read>(
270    reader: &mut ZiPatchReader<R>,
271    target_chunks: u64,
272    expected_bytes_read: u64,
273) -> ApplyResult<()> {
274    let mut consumed: u64 = 0;
275    while consumed < target_chunks {
276        match reader.next_chunk()? {
277            Some(_) => consumed += 1,
278            None => {
279                return Err(ApplyError::Parse(ParseError::TruncatedPatch));
280            }
281        }
282    }
283    if reader.bytes_read() != expected_bytes_read {
284        tracing::warn!(
285            actual_bytes_read = reader.bytes_read(),
286            expected_bytes_read,
287            target_chunks,
288            "resume_apply_patch: bytes_read drift during fast-forward"
289        );
290    }
291    tracing::debug!(
292        skipped_chunks = target_chunks,
293        bytes_read = reader.bytes_read(),
294        "resume_apply_patch: fast-forward complete"
295    );
296    Ok(())
297}
298
299fn resume_in_flight_chunk<R: Read>(
300    reader: &mut ZiPatchReader<R>,
301    session: &mut ApplySession,
302    chunk_index: u64,
303    in_flight: &apply::InFlightAddFile,
304) -> ApplyResult<()> {
305    let Some(rec) = reader.next_chunk()? else {
306        return Err(ApplyError::Parse(ParseError::TruncatedPatch));
307    };
308
309    session.current_chunk_index = chunk_index;
310    session.current_chunk_bytes_read = rec.bytes_read;
311
312    let (start_block, start_bytes) = match resolve_in_flight_resume(&rec.chunk, session, in_flight)
313    {
314        InFlightResume::Resume {
315            start_block,
316            start_bytes,
317        } => (start_block, start_bytes),
318        InFlightResume::Restart => (0, 0),
319    };
320
321    match &rec.chunk {
322        chunk::Chunk::Sqpk(chunk::SqpkCommand::File(file))
323            if matches!(
324                file.operation,
325                crate::chunk::sqpk::SqpkFileOperation::AddFile
326            ) =>
327        {
328            apply::sqpk::apply_file_add_from(file, session, start_block, start_bytes)?;
329        }
330        _ => rec.chunk.apply(session)?,
331    }
332
333    let bytes_read = rec.bytes_read;
334    let tag = rec.tag;
335    let next_chunk_index = chunk_index + 1;
336    let checkpoint = Checkpoint::Sequential(SequentialCheckpoint {
337        schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
338        next_chunk_index,
339        bytes_read,
340        patch_name: session.patch_name.clone(),
341        patch_size: session.patch_size,
342        in_flight: None,
343    });
344    session.record_checkpoint(&checkpoint)?;
345    let event = apply::ChunkEvent {
346        index: chunk_index as usize,
347        kind: tag,
348        bytes_read,
349    };
350    if let ControlFlow::Break(()) = session.observer_mut().on_chunk_applied(event) {
351        return Err(ApplyError::Cancelled);
352    }
353    if session.cancel_requested() {
354        return Err(ApplyError::Cancelled);
355    }
356    Ok(())
357}
358
359enum InFlightResume {
360    Resume {
361        start_block: usize,
362        start_bytes: u64,
363    },
364    Restart,
365}
366
367fn resolve_in_flight_resume(
368    chunk: &chunk::Chunk,
369    session: &ApplySession,
370    in_flight: &apply::InFlightAddFile,
371) -> InFlightResume {
372    let chunk::Chunk::Sqpk(chunk::SqpkCommand::File(file)) = chunk else {
373        tracing::warn!(
374            "resume_apply_patch: in-flight chunk is not an SqpkFile; discarding in-flight state"
375        );
376        return InFlightResume::Restart;
377    };
378    if !matches!(
379        file.operation,
380        crate::chunk::sqpk::SqpkFileOperation::AddFile
381    ) {
382        tracing::warn!(
383            "resume_apply_patch: in-flight chunk is not an AddFile; discarding in-flight state"
384        );
385        return InFlightResume::Restart;
386    }
387
388    let expected_path = apply::path::generic_path(session, &file.path);
389    if expected_path != in_flight.target_path {
390        tracing::warn!(
391            chunk_path = %expected_path.display(),
392            in_flight_path = %in_flight.target_path.display(),
393            "resume_apply_patch: in-flight target path does not match chunk; discarding"
394        );
395        return InFlightResume::Restart;
396    }
397    let chunk_offset = file.file_offset;
398    if chunk_offset != in_flight.file_offset {
399        tracing::warn!(
400            chunk_offset,
401            in_flight_offset = in_flight.file_offset,
402            "resume_apply_patch: in-flight file_offset does not match chunk; discarding"
403        );
404        return InFlightResume::Restart;
405    }
406    if in_flight.block_idx as usize > file.blocks.len() {
407        tracing::warn!(
408            block_idx = in_flight.block_idx,
409            block_count = file.blocks.len(),
410            "resume_apply_patch: in-flight block_idx out of range; discarding"
411        );
412        return InFlightResume::Restart;
413    }
414    if chunk_offset == 0 && in_flight.bytes_into_target > 0 {
415        let on_disk_len = session
416            .vfs()
417            .metadata(&in_flight.target_path)
418            .map_or(0, |m| m.len);
419        if on_disk_len < in_flight.bytes_into_target {
420            tracing::warn!(
421                target = %in_flight.target_path.display(),
422                on_disk_len,
423                bytes_into_target = in_flight.bytes_into_target,
424                "resume_apply_patch: target file truncated or missing since checkpoint; restarting AddFile"
425            );
426            return InFlightResume::Restart;
427        }
428    }
429
430    InFlightResume::Resume {
431        start_block: in_flight.block_idx as usize,
432        start_bytes: in_flight.bytes_into_target,
433    }
434}