1use std::{
4 collections::BTreeMap,
5 fs::File,
6 io::{Cursor, Read, Seek},
7 path::Path,
8};
9
10use zip::{ZipArchive, result::ZipError};
11
12use crate::{
13 BeamModule, BeamSet, ContentHash, ExtractionLimits, Manifest, PackageError,
14 builder::is_safe_logical_name, content_hash, extraction::ExtractionBudget,
15 namespace::deployed_name, version::WorkflowVersion,
16};
17
18const MANIFEST_ENTRY: &str = "manifest.json";
19const BEAM_PREFIX: &str = "beam/";
20const BEAM_SUFFIX: &str = ".beam";
21const SOURCE_PREFIX: &str = "src/";
22const SOURCE_SUFFIX: &str = ".gleam";
23
24#[derive(Clone, Debug, PartialEq)]
30pub struct Package {
31 manifest: Manifest,
32 beams: BeamSet,
33 source: BTreeMap<String, Vec<u8>>,
34 content_hash: ContentHash,
35}
36
37impl Package {
38 pub fn load_from_path(
49 path: impl AsRef<Path>,
50 limits: ExtractionLimits,
51 ) -> Result<Self, PackageError> {
52 let file =
53 File::open(path).map_err(|source| PackageError::ArchiveRead(ZipError::Io(source)))?;
54 Self::load_from_reader(file, limits)
55 }
56
57 pub fn load_from_bytes(
68 bytes: impl AsRef<[u8]>,
69 limits: ExtractionLimits,
70 ) -> Result<Self, PackageError> {
71 Self::load_from_reader(Cursor::new(bytes.as_ref()), limits)
72 }
73
74 fn load_from_reader<R>(reader: R, limits: ExtractionLimits) -> Result<Self, PackageError>
75 where
76 R: Read + Seek,
77 {
78 let mut archive = ZipArchive::new(reader).map_err(PackageError::ArchiveRead)?;
79 let mut budget = limits.budget();
80 let manifest = read_manifest(&mut archive, &mut budget)?;
81 manifest.check_format_version()?;
82
83 let (beams, source) = read_archive_entries(&mut archive, &mut budget)?;
84 let content_hash = content_hash(&beams);
85 let computed = content_hash.to_string();
86 if manifest.version.as_str() != computed {
87 return Err(PackageError::IntegrityMismatch {
88 expected: manifest.version.as_str().to_owned(),
89 computed,
90 });
91 }
92
93 if beams.get(&manifest.entry_module).is_none() {
94 return Err(PackageError::MissingEntryModule {
95 module: manifest.entry_module.clone(),
96 });
97 }
98
99 Ok(Self {
100 manifest,
101 beams,
102 source,
103 content_hash,
104 })
105 }
106
107 #[must_use]
109 pub const fn manifest(&self) -> &Manifest {
110 &self.manifest
111 }
112
113 #[must_use]
115 pub const fn beams(&self) -> &BeamSet {
116 &self.beams
117 }
118
119 #[must_use]
121 pub const fn source(&self) -> &BTreeMap<String, Vec<u8>> {
122 &self.source
123 }
124
125 #[must_use]
127 pub const fn content_hash(&self) -> &ContentHash {
128 &self.content_hash
129 }
130
131 #[must_use]
133 pub fn version_record(&self) -> WorkflowVersion {
134 WorkflowVersion {
135 entry_module: self.manifest.entry_module.clone(),
136 content_hash: self.content_hash.clone(),
137 activities: self.manifest.activities.clone(),
138 input_schema: self.manifest.input_schema.clone(),
139 output_schema: self.manifest.output_schema.clone(),
140 }
141 }
142
143 #[must_use]
148 pub fn deployed_modules(&self) -> Vec<(String, &[u8])> {
149 self.beams
150 .iter()
151 .map(|module| {
152 (
153 deployed_name(module.name(), &self.content_hash),
154 module.bytes(),
155 )
156 })
157 .collect()
158 }
159
160 #[must_use]
162 pub fn deployed_entry_module(&self) -> String {
163 deployed_name(&self.manifest.entry_module, &self.content_hash)
164 }
165
166 pub fn to_archive_bytes(&self) -> Result<Vec<u8>, PackageError> {
182 crate::PackageBuilder::with_source(
183 self.manifest.clone(),
184 self.beams.clone(),
185 self.source.clone(),
186 )
187 .write_to_bytes()
188 }
189
190 #[cfg(any(test, feature = "test-support"))]
191 #[doc(hidden)]
192 #[must_use]
193 pub fn from_validated_parts_for_test(
194 manifest: Manifest,
195 beams: BeamSet,
196 source: BTreeMap<String, Vec<u8>>,
197 content_hash: ContentHash,
198 ) -> Self {
199 Self {
200 manifest,
201 beams,
202 source,
203 content_hash,
204 }
205 }
206}
207
208fn read_manifest<R>(
209 archive: &mut ZipArchive<R>,
210 budget: &mut ExtractionBudget,
211) -> Result<Manifest, PackageError>
212where
213 R: Read + Seek,
214{
215 let mut manifest_file = match archive.by_name(MANIFEST_ENTRY) {
216 Ok(file) => file,
217 Err(ZipError::FileNotFound) => return Err(PackageError::MissingManifest),
218 Err(error) => return Err(PackageError::ArchiveRead(error)),
219 };
220
221 let manifest_bytes = budget.read_entry(&mut manifest_file)?;
222
223 serde_json::from_slice(&manifest_bytes).map_err(|source| PackageError::ManifestParse { source })
224}
225
226fn read_archive_entries<R>(
227 archive: &mut ZipArchive<R>,
228 budget: &mut ExtractionBudget,
229) -> Result<(BeamSet, BTreeMap<String, Vec<u8>>), PackageError>
230where
231 R: Read + Seek,
232{
233 let mut modules = Vec::new();
234 let mut source = BTreeMap::new();
235
236 for index in 0..archive.len() {
237 let mut file = archive.by_index(index).map_err(PackageError::ArchiveRead)?;
238 if file.is_dir() {
239 continue;
240 }
241
242 let entry = file.name().to_owned();
243 if entry == MANIFEST_ENTRY {
244 continue;
245 }
246
247 if entry.starts_with(BEAM_PREFIX) {
248 let logical = logical_name_from_entry(&entry, BEAM_PREFIX, BEAM_SUFFIX)?;
249 let bytes = budget.read_entry(&mut file)?;
250 modules.push(BeamModule::new(logical, bytes));
251 } else if entry.starts_with(SOURCE_PREFIX) {
252 let logical = logical_name_from_entry(&entry, SOURCE_PREFIX, SOURCE_SUFFIX)?;
253 let bytes = budget.read_entry(&mut file)?;
254 if source.insert(logical, bytes).is_some() {
255 return Err(PackageError::MalformedBeamEntry { entry });
256 }
257 }
258 }
259
260 let beams = BeamSet::new(modules)?;
261 Ok((beams, source))
262}
263
264fn logical_name_from_entry(
265 entry: &str,
266 prefix: &str,
267 suffix: &str,
268) -> Result<String, PackageError> {
269 let Some(without_prefix) = entry.strip_prefix(prefix) else {
270 return Err(PackageError::MalformedBeamEntry {
271 entry: entry.to_owned(),
272 });
273 };
274 let Some(logical) = without_prefix.strip_suffix(suffix) else {
275 return Err(PackageError::MalformedBeamEntry {
276 entry: entry.to_owned(),
277 });
278 };
279
280 if is_safe_logical_name(logical) {
281 Ok(logical.to_owned())
282 } else {
283 Err(PackageError::MalformedBeamEntry {
284 entry: entry.to_owned(),
285 })
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use std::{
292 collections::BTreeMap,
293 fs,
294 io::{Cursor, Write},
295 time::{Duration, SystemTime, UNIX_EPOCH},
296 };
297
298 use serde_json::json;
299 use zip::{CompressionMethod, ZipWriter, write::SimpleFileOptions};
300
301 use super::Package;
302 use crate::{
303 BeamModule, BeamSet, CURRENT_FORMAT_VERSION, DeclaredActivity, ExtractionLimits, Manifest,
304 ManifestVersion, PackageBuilder, PackageError, content_hash,
305 };
306
307 fn sample_manifest() -> Manifest {
308 Manifest {
309 entry_module: "workflow/order".to_owned(),
310 entry_function: "run".to_owned(),
311 input_schema: json!({ "type": "object" }),
312 output_schema: json!({ "type": "object" }),
313 timeout: Duration::from_secs(30),
314 activities: vec![DeclaredActivity {
315 activity_type: "charge_card".to_owned(),
316 }],
317 version: ManifestVersion::new("placeholder"),
318 format_version: CURRENT_FORMAT_VERSION,
319 }
320 }
321
322 fn sample_beams() -> Result<BeamSet, PackageError> {
323 BeamSet::new(vec![
324 BeamModule::new("workflow/support", vec![4, 5, 6]),
325 BeamModule::new("workflow/order", vec![1, 2, 3]),
326 ])
327 }
328
329 fn write_zip<I, N, B>(entries: I) -> Result<Vec<u8>, PackageError>
330 where
331 I: IntoIterator<Item = (N, B)>,
332 N: ToString,
333 B: AsRef<[u8]>,
334 {
335 let cursor = Cursor::new(Vec::new());
336 let mut archive = ZipWriter::new(cursor);
337 let options = SimpleFileOptions::default()
338 .compression_method(CompressionMethod::Stored)
339 .compression_level(None);
340
341 for (name, bytes) in entries {
342 archive
343 .start_file(name, options)
344 .map_err(PackageError::ArchiveWrite)?;
345 archive
346 .write_all(bytes.as_ref())
347 .map_err(|source| PackageError::ArchiveWriteIo { source })?;
348 }
349
350 let cursor = archive.finish().map_err(PackageError::ArchiveWrite)?;
351 Ok(cursor.into_inner())
352 }
353
354 fn archive_with_manifest(manifest: &Manifest) -> Result<Vec<u8>, PackageError> {
355 let manifest_bytes = serde_json::to_vec(manifest)
356 .map_err(|source| PackageError::ManifestSerialise { source })?;
357 write_zip([("manifest.json", manifest_bytes)])
358 }
359
360 fn deflated_zip<I, N, B>(entries: I) -> Result<Vec<u8>, PackageError>
364 where
365 I: IntoIterator<Item = (N, B)>,
366 N: ToString,
367 B: AsRef<[u8]>,
368 {
369 let cursor = Cursor::new(Vec::new());
370 let mut archive = ZipWriter::new(cursor);
371 let options = SimpleFileOptions::default().compression_method(CompressionMethod::Deflated);
372
373 for (name, bytes) in entries {
374 archive
375 .start_file(name, options)
376 .map_err(PackageError::ArchiveWrite)?;
377 archive
378 .write_all(bytes.as_ref())
379 .map_err(|source| PackageError::ArchiveWriteIo { source })?;
380 }
381
382 let cursor = archive.finish().map_err(PackageError::ArchiveWrite)?;
383 Ok(cursor.into_inner())
384 }
385
386 #[test]
390 fn inflate_bomb_past_bounded_budget_is_refused_reporting_the_limit()
391 -> Result<(), Box<dyn std::error::Error>> {
392 const BUDGET: u64 = 65_536;
393 let manifest_bytes = serde_json::to_vec(&sample_manifest())
394 .map_err(|source| PackageError::ManifestSerialise { source })?;
395 let bytes = deflated_zip([
396 ("manifest.json", manifest_bytes),
397 ("beam/workflow/order.beam", vec![0_u8; 4 * 1024 * 1024]),
398 ])?;
399 assert!(
400 u64::try_from(bytes.len())? < BUDGET,
401 "bomb must compress under the budget to model a sneaky upload: {} bytes",
402 bytes.len()
403 );
404
405 let result = Package::load_from_bytes(&bytes, ExtractionLimits::bounded(BUDGET));
406
407 assert!(matches!(
408 result,
409 Err(PackageError::InflatedSizeExceeded { limit: BUDGET })
410 ));
411 Ok(())
412 }
413
414 #[test]
417 fn package_on_exact_inflate_budget_loads_and_one_byte_under_refuses()
418 -> Result<(), Box<dyn std::error::Error>> {
419 let beams = sample_beams()?;
420 let mut manifest = sample_manifest();
421 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
422 let manifest_bytes = serde_json::to_vec(&manifest)?;
423 let entries = vec![
424 ("manifest.json".to_owned(), manifest_bytes.clone()),
425 ("beam/workflow/support.beam".to_owned(), vec![4, 5, 6]),
426 ("beam/workflow/order.beam".to_owned(), vec![1, 2, 3]),
427 ];
428 let inflated_total = u64::try_from(manifest_bytes.len())? + 6;
429 let bytes = write_zip(entries)?;
430
431 let loaded = Package::load_from_bytes(&bytes, ExtractionLimits::bounded(inflated_total))?;
432 assert_eq!(loaded.beams().len(), 2);
433
434 let result =
435 Package::load_from_bytes(&bytes, ExtractionLimits::bounded(inflated_total - 1));
436 assert!(matches!(
437 result,
438 Err(PackageError::InflatedSizeExceeded { limit }) if limit == inflated_total - 1
439 ));
440 Ok(())
441 }
442
443 #[test]
444 fn non_zip_input_returns_archive_read() {
445 let result = Package::load_from_bytes(b"not a zip archive", ExtractionLimits::unbounded());
446
447 assert!(matches!(result, Err(PackageError::ArchiveRead(_))));
448 }
449
450 #[test]
451 fn truncated_zip_input_returns_archive_read() -> Result<(), PackageError> {
452 let bytes = archive_with_manifest(&sample_manifest())?;
453 let truncated = &bytes[..bytes.len() / 2];
454 let result = Package::load_from_bytes(truncated, ExtractionLimits::unbounded());
455
456 assert!(matches!(result, Err(PackageError::ArchiveRead(_))));
457 Ok(())
458 }
459
460 #[test]
461 fn missing_manifest_returns_missing_manifest() -> Result<(), PackageError> {
462 let bytes = write_zip([("beam/workflow/order.beam", vec![1, 2, 3])])?;
463 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
464
465 assert!(matches!(result, Err(PackageError::MissingManifest)));
466 Ok(())
467 }
468
469 #[test]
470 fn unparseable_manifest_returns_manifest_parse() -> Result<(), PackageError> {
471 let bytes = write_zip([("manifest.json", b"not-json".to_vec())])?;
472 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
473
474 assert!(matches!(result, Err(PackageError::ManifestParse { .. })));
475 Ok(())
476 }
477
478 #[test]
479 fn unknown_format_version_returns_exact_variant() -> Result<(), PackageError> {
480 let mut manifest = sample_manifest();
481 manifest.format_version = CURRENT_FORMAT_VERSION + 99;
482 let bytes = archive_with_manifest(&manifest)?;
483 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
484
485 assert!(matches!(
486 result,
487 Err(PackageError::UnknownFormatVersion { found }) if found == CURRENT_FORMAT_VERSION + 99
488 ));
489 Ok(())
490 }
491
492 #[test]
493 fn malformed_beam_entry_returns_exact_variant() -> Result<(), PackageError> {
494 let beams = sample_beams()?;
495 let mut manifest = sample_manifest();
496 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
497 let manifest_bytes = serde_json::to_vec(&manifest)
498 .map_err(|source| PackageError::ManifestSerialise { source })?;
499 let bytes = write_zip([
500 ("manifest.json", manifest_bytes),
501 ("beam/workflow/order.txt", vec![1, 2, 3]),
502 ])?;
503 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
504
505 assert!(matches!(
506 result,
507 Err(PackageError::MalformedBeamEntry { entry }) if entry == "beam/workflow/order.txt"
508 ));
509 Ok(())
510 }
511
512 #[test]
513 fn beam_entry_with_deployed_name_separator_returns_malformed_entry() -> Result<(), PackageError>
514 {
515 let beams = sample_beams()?;
516 let mut manifest = sample_manifest();
517 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
518 let manifest_bytes = serde_json::to_vec(&manifest)
519 .map_err(|source| PackageError::ManifestSerialise { source })?;
520 let bytes = write_zip([
521 ("manifest.json", manifest_bytes),
522 ("beam/workflow/order$bad.beam", vec![1, 2, 3]),
523 ])?;
524 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
525
526 assert!(matches!(
527 result,
528 Err(PackageError::MalformedBeamEntry { entry }) if entry == "beam/workflow/order$bad.beam"
529 ));
530 Ok(())
531 }
532
533 #[test]
534 fn invalid_source_entry_returns_malformed_entry() -> Result<(), PackageError> {
535 let beams = sample_beams()?;
536 let mut manifest = sample_manifest();
537 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
538 let mut entries = vec![
539 (
540 "manifest.json".to_owned(),
541 serde_json::to_vec(&manifest)
542 .map_err(|source| PackageError::ManifestSerialise { source })?,
543 ),
544 ("src/workflow/order.txt".to_owned(), b"source".to_vec()),
545 ];
546 entries.extend(beams.iter().map(|module| {
547 (
548 format!("beam/{}.beam", module.name()),
549 module.bytes().to_vec(),
550 )
551 }));
552 let result = Package::load_from_bytes(write_zip(entries)?, ExtractionLimits::unbounded());
553
554 assert!(matches!(
555 result,
556 Err(PackageError::MalformedBeamEntry { entry }) if entry == "src/workflow/order.txt"
557 ));
558 Ok(())
559 }
560
561 #[test]
562 fn missing_entry_module_returns_exact_variant_when_hash_matches() -> Result<(), PackageError> {
563 let beams = BeamSet::new(vec![BeamModule::new("workflow/support", vec![4, 5, 6])])?;
564 let mut manifest = sample_manifest();
565 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
566 let manifest_bytes = serde_json::to_vec(&manifest)
567 .map_err(|source| PackageError::ManifestSerialise { source })?;
568 let bytes = write_zip([
569 ("manifest.json", manifest_bytes),
570 ("beam/workflow/support.beam", vec![4, 5, 6]),
571 ])?;
572 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
573
574 assert!(matches!(
575 result,
576 Err(PackageError::MissingEntryModule { module }) if module == "workflow/order"
577 ));
578 Ok(())
579 }
580
581 #[test]
582 fn builder_produced_package_loads_successfully() -> Result<(), PackageError> {
583 let bytes = PackageBuilder::with_source(
584 sample_manifest(),
585 sample_beams()?,
586 BTreeMap::from([(
587 "workflow/order".to_owned(),
588 b"pub fn run() { Nil }".to_vec(),
589 )]),
590 )
591 .write_to_bytes()?;
592
593 let package = Package::load_from_bytes(bytes, ExtractionLimits::unbounded())?;
594
595 assert_eq!(package.manifest().entry_module, "workflow/order");
596 assert_eq!(package.beams().len(), 2);
597 assert_eq!(
598 package.source().get("workflow/order"),
599 Some(&b"pub fn run() { Nil }".to_vec())
600 );
601 assert_eq!(
602 package.content_hash().to_string(),
603 package.manifest().version.as_str()
604 );
605 Ok(())
606 }
607
608 #[test]
609 fn to_archive_bytes_round_trips_to_an_identical_package() -> Result<(), PackageError> {
610 let bytes = PackageBuilder::with_source(
611 sample_manifest(),
612 sample_beams()?,
613 BTreeMap::from([(
614 "workflow/order".to_owned(),
615 b"pub fn run() { Nil }".to_vec(),
616 )]),
617 )
618 .write_to_bytes()?;
619 let package = Package::load_from_bytes(bytes, ExtractionLimits::unbounded())?;
620
621 let reloaded =
622 Package::load_from_bytes(package.to_archive_bytes()?, ExtractionLimits::unbounded())?;
623
624 assert_eq!(reloaded, package);
625 assert_eq!(
626 reloaded.manifest().canonical_digest()?,
627 package.manifest().canonical_digest()?
628 );
629 Ok(())
630 }
631
632 #[test]
633 fn load_from_path_loads_successfully() -> Result<(), Box<dyn std::error::Error>> {
634 let bytes = PackageBuilder::new(sample_manifest(), sample_beams()?).write_to_bytes()?;
635 let nanos = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
636 let path = std::env::temp_dir().join(format!("aion-package-{nanos}.aion"));
637 fs::write(&path, bytes)?;
638
639 let package_result = Package::load_from_path(&path, ExtractionLimits::unbounded());
640 let remove_result = fs::remove_file(&path);
641
642 let package = package_result?;
643 remove_result?;
644 assert_eq!(package.manifest().entry_module, "workflow/order");
645 Ok(())
646 }
647}