1use crate::apply::{self, ApplyConfig, ApplySession, Checkpoint, SequentialCheckpoint};
17use crate::chunk::{self, ZiPatchReader};
18use crate::{ApplyError, ApplyResult, ParseError};
19use std::io::{Read, Seek};
20use std::ops::ControlFlow;
21
22impl ApplyConfig {
23 pub fn apply_patch<R: Read>(self, reader: ZiPatchReader<R>) -> ApplyResult<()> {
31 self.into_session().apply_patch(reader)
32 }
33
34 pub fn resume_apply_patch<R: Read + Seek>(
40 self,
41 reader: ZiPatchReader<R>,
42 from: Option<&SequentialCheckpoint>,
43 ) -> ApplyResult<SequentialCheckpoint> {
44 self.into_session().resume_apply_patch(reader, from)
45 }
46}
47
48impl ApplySession {
49 pub fn apply_patch<R: Read>(&mut self, mut reader: ZiPatchReader<R>) -> ApplyResult<()> {
65 let span = tracing::info_span!(crate::tracing_schema::span_names::APPLY_PATCH);
66 let _enter = span.enter();
67 let started = std::time::Instant::now();
68 self.patch_name = reader.patch_name().map(str::to_owned);
69 self.patch_size = None;
70 let result = run_apply_loop(&mut reader, self, 0);
71 let flush_result = self.flush();
72 let (final_result, chunks_applied) = match (result, flush_result) {
73 (Ok(n), Ok(())) => (Ok(()), n),
74 (Ok(_), Err(e)) => (Err(ApplyError::from(e)), 0),
75 (Err(e), _) => (Err(e), 0),
76 };
77 if final_result.is_ok() {
78 tracing::info!(
79 chunks = chunks_applied,
80 bytes_read = reader.bytes_read(),
81 resumed_from_chunk = tracing::field::Empty,
82 elapsed_ms = started.elapsed().as_millis() as u64,
83 "apply_patch: patch applied"
84 );
85 }
86 final_result
87 }
88
89 #[allow(clippy::too_many_lines)]
100 pub fn resume_apply_patch<R: Read + Seek>(
101 &mut self,
102 mut reader: ZiPatchReader<R>,
103 from: Option<&SequentialCheckpoint>,
104 ) -> ApplyResult<SequentialCheckpoint> {
105 let span = tracing::info_span!(crate::tracing_schema::span_names::RESUME_APPLY_PATCH);
106 let _enter = span.enter();
107 let started = std::time::Instant::now();
108
109 if let Some(cp) = from {
110 if !cp
111 .schema_version
112 .compatible_with(SequentialCheckpoint::CURRENT_SCHEMA_VERSION)
113 {
114 return Err(ApplyError::SchemaVersionMismatch {
115 kind: "sequential-checkpoint",
116 found: cp.schema_version,
117 expected: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
118 });
119 }
120 }
121
122 let reader_name = reader.patch_name().map(str::to_owned);
123 let total_size = stream_total_size(&mut reader)?;
124 self.patch_name.clone_from(&reader_name);
125 self.patch_size = Some(total_size);
126
127 let effective_from = from.and_then(|cp| {
128 let name_match = cp.patch_name == reader_name;
129 let size_match = match cp.patch_size {
130 Some(sz) => sz == total_size,
131 None => true,
132 };
133 if name_match && size_match {
134 Some(cp)
135 } else {
136 tracing::warn!(
137 expected_patch_name = ?reader_name,
138 expected_patch_size = total_size,
139 checkpoint_patch_name = ?cp.patch_name,
140 checkpoint_patch_size = ?cp.patch_size,
141 "resume_apply_patch: stale checkpoint, restarting from chunk 0"
142 );
143 None
144 }
145 });
146
147 let resumed_from_chunk = effective_from.map(|cp| cp.next_chunk_index);
148 let skipped_bytes_at_start = effective_from.map_or(0, |cp| cp.bytes_read);
149 let has_in_flight = effective_from
150 .and_then(|cp| cp.in_flight.as_ref())
151 .is_some();
152
153 if let Some(cp) = effective_from {
154 tracing::info!(
155 patch_name = ?reader_name,
156 skipped_chunks = cp.next_chunk_index,
157 skipped_bytes = cp.bytes_read,
158 has_in_flight,
159 "resume_apply_patch: resuming patch"
160 );
161 fast_forward(&mut reader, cp.next_chunk_index, cp.bytes_read)?;
162 }
163
164 let start_index = effective_from.map_or(0, |cp| cp.next_chunk_index);
165 let in_flight = effective_from.and_then(|cp| cp.in_flight.clone());
166
167 let result: ApplyResult<u64> = (|| {
168 if let Some(in_flight) = in_flight {
169 resume_in_flight_chunk(&mut reader, self, start_index, &in_flight)?;
170 run_apply_loop(&mut reader, self, start_index + 1).map(|n| n + 1)
171 } else {
172 run_apply_loop(&mut reader, self, start_index)
173 }
174 })();
175
176 let flush_result = self.flush();
177 let (final_result, chunks_applied) = match (result, flush_result) {
178 (Ok(n), Ok(())) => (Ok(()), n),
179 (Ok(_), Err(e)) => (Err(ApplyError::from(e)), 0),
180 (Err(e), _) => (Err(e), 0),
181 };
182
183 match final_result {
184 Ok(()) => {
185 let bytes_read = reader.bytes_read();
186 if let Some(from_chunk) = resumed_from_chunk {
187 tracing::info!(
188 chunks = chunks_applied,
189 bytes_read,
190 resumed_from_chunk = from_chunk,
191 skipped_bytes = skipped_bytes_at_start,
192 elapsed_ms = started.elapsed().as_millis() as u64,
193 "resume_apply_patch: patch applied"
194 );
195 } else {
196 tracing::info!(
197 chunks = chunks_applied,
198 bytes_read,
199 resumed_from_chunk = tracing::field::Empty,
200 elapsed_ms = started.elapsed().as_millis() as u64,
201 "resume_apply_patch: patch applied"
202 );
203 }
204 Ok(SequentialCheckpoint {
205 schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
206 next_chunk_index: start_index + chunks_applied,
207 bytes_read,
208 patch_name: reader_name,
209 patch_size: Some(total_size),
210 in_flight: None,
211 })
212 }
213 Err(e) => Err(e),
214 }
215 }
216}
217
218fn run_apply_loop<R: Read>(
219 reader: &mut ZiPatchReader<R>,
220 session: &mut ApplySession,
221 start_index: u64,
222) -> ApplyResult<u64> {
223 let mut index = start_index;
224 while let Some(rec) = reader.next_chunk()? {
225 session.current_chunk_index = index;
226 session.current_chunk_bytes_read = rec.bytes_read;
227 rec.chunk.apply(session)?;
228 let bytes_read = rec.bytes_read;
229 let next_chunk_index = index + 1;
230 let checkpoint = Checkpoint::Sequential(SequentialCheckpoint {
231 schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
232 next_chunk_index,
233 bytes_read,
234 patch_name: session.patch_name.clone(),
235 patch_size: session.patch_size,
236 in_flight: None,
237 });
238 tracing::debug!(
239 next_chunk_index,
240 bytes_read,
241 in_flight = false,
242 "apply_patch: checkpoint recorded"
243 );
244 session.record_checkpoint(&checkpoint)?;
245 let event = apply::ChunkEvent {
246 index: index as usize,
247 kind: rec.tag,
248 bytes_read,
249 };
250 if let ControlFlow::Break(()) = session.observer_mut().on_chunk_applied(event) {
251 return Err(ApplyError::Cancelled);
252 }
253 if session.cancel_requested() {
254 return Err(ApplyError::Cancelled);
255 }
256 index += 1;
257 }
258 Ok(index - start_index)
259}
260
261fn stream_total_size<R: Read + Seek>(reader: &mut ZiPatchReader<R>) -> ApplyResult<u64> {
262 let inner = reader.inner_mut();
263 let current = inner.stream_position()?;
264 let end = inner.seek(std::io::SeekFrom::End(0))?;
265 inner.seek(std::io::SeekFrom::Start(current))?;
266 Ok(end)
267}
268
269fn fast_forward<R: Read>(
270 reader: &mut ZiPatchReader<R>,
271 target_chunks: u64,
272 expected_bytes_read: u64,
273) -> ApplyResult<()> {
274 let mut consumed: u64 = 0;
275 while consumed < target_chunks {
276 match reader.next_chunk()? {
277 Some(_) => consumed += 1,
278 None => {
279 return Err(ApplyError::Parse(ParseError::TruncatedPatch));
280 }
281 }
282 }
283 if reader.bytes_read() != expected_bytes_read {
284 tracing::warn!(
285 actual_bytes_read = reader.bytes_read(),
286 expected_bytes_read,
287 target_chunks,
288 "resume_apply_patch: bytes_read drift during fast-forward"
289 );
290 }
291 tracing::debug!(
292 skipped_chunks = target_chunks,
293 bytes_read = reader.bytes_read(),
294 "resume_apply_patch: fast-forward complete"
295 );
296 Ok(())
297}
298
299fn resume_in_flight_chunk<R: Read>(
300 reader: &mut ZiPatchReader<R>,
301 session: &mut ApplySession,
302 chunk_index: u64,
303 in_flight: &apply::InFlightAddFile,
304) -> ApplyResult<()> {
305 let Some(rec) = reader.next_chunk()? else {
306 return Err(ApplyError::Parse(ParseError::TruncatedPatch));
307 };
308
309 session.current_chunk_index = chunk_index;
310 session.current_chunk_bytes_read = rec.bytes_read;
311
312 let (start_block, start_bytes) = match resolve_in_flight_resume(&rec.chunk, session, in_flight)
313 {
314 InFlightResume::Resume {
315 start_block,
316 start_bytes,
317 } => (start_block, start_bytes),
318 InFlightResume::Restart => (0, 0),
319 };
320
321 match &rec.chunk {
322 chunk::Chunk::Sqpk(chunk::SqpkCommand::File(file))
323 if matches!(
324 file.operation,
325 crate::chunk::sqpk::SqpkFileOperation::AddFile
326 ) =>
327 {
328 apply::sqpk::apply_file_add_from(file, session, start_block, start_bytes)?;
329 }
330 _ => rec.chunk.apply(session)?,
331 }
332
333 let bytes_read = rec.bytes_read;
334 let tag = rec.tag;
335 let next_chunk_index = chunk_index + 1;
336 let checkpoint = Checkpoint::Sequential(SequentialCheckpoint {
337 schema_version: SequentialCheckpoint::CURRENT_SCHEMA_VERSION,
338 next_chunk_index,
339 bytes_read,
340 patch_name: session.patch_name.clone(),
341 patch_size: session.patch_size,
342 in_flight: None,
343 });
344 session.record_checkpoint(&checkpoint)?;
345 let event = apply::ChunkEvent {
346 index: chunk_index as usize,
347 kind: tag,
348 bytes_read,
349 };
350 if let ControlFlow::Break(()) = session.observer_mut().on_chunk_applied(event) {
351 return Err(ApplyError::Cancelled);
352 }
353 if session.cancel_requested() {
354 return Err(ApplyError::Cancelled);
355 }
356 Ok(())
357}
358
359enum InFlightResume {
360 Resume {
361 start_block: usize,
362 start_bytes: u64,
363 },
364 Restart,
365}
366
367fn resolve_in_flight_resume(
368 chunk: &chunk::Chunk,
369 session: &ApplySession,
370 in_flight: &apply::InFlightAddFile,
371) -> InFlightResume {
372 let chunk::Chunk::Sqpk(chunk::SqpkCommand::File(file)) = chunk else {
373 tracing::warn!(
374 "resume_apply_patch: in-flight chunk is not an SqpkFile; discarding in-flight state"
375 );
376 return InFlightResume::Restart;
377 };
378 if !matches!(
379 file.operation,
380 crate::chunk::sqpk::SqpkFileOperation::AddFile
381 ) {
382 tracing::warn!(
383 "resume_apply_patch: in-flight chunk is not an AddFile; discarding in-flight state"
384 );
385 return InFlightResume::Restart;
386 }
387
388 let expected_path = apply::path::generic_path(session, &file.path);
389 if expected_path != in_flight.target_path {
390 tracing::warn!(
391 chunk_path = %expected_path.display(),
392 in_flight_path = %in_flight.target_path.display(),
393 "resume_apply_patch: in-flight target path does not match chunk; discarding"
394 );
395 return InFlightResume::Restart;
396 }
397 let chunk_offset = file.file_offset;
398 if chunk_offset != in_flight.file_offset {
399 tracing::warn!(
400 chunk_offset,
401 in_flight_offset = in_flight.file_offset,
402 "resume_apply_patch: in-flight file_offset does not match chunk; discarding"
403 );
404 return InFlightResume::Restart;
405 }
406 if in_flight.block_idx as usize > file.blocks.len() {
407 tracing::warn!(
408 block_idx = in_flight.block_idx,
409 block_count = file.blocks.len(),
410 "resume_apply_patch: in-flight block_idx out of range; discarding"
411 );
412 return InFlightResume::Restart;
413 }
414 if chunk_offset == 0 && in_flight.bytes_into_target > 0 {
415 let on_disk_len = session
416 .vfs()
417 .metadata(&in_flight.target_path)
418 .map_or(0, |m| m.len);
419 if on_disk_len < in_flight.bytes_into_target {
420 tracing::warn!(
421 target = %in_flight.target_path.display(),
422 on_disk_len,
423 bytes_into_target = in_flight.bytes_into_target,
424 "resume_apply_patch: target file truncated or missing since checkpoint; restarting AddFile"
425 );
426 return InFlightResume::Restart;
427 }
428 }
429
430 InFlightResume::Resume {
431 start_block: in_flight.block_idx as usize,
432 start_bytes: in_flight.bytes_into_target,
433 }
434}