1use std::{
4 collections::BTreeMap,
5 fs::File,
6 io::{Cursor, Read, Seek},
7 path::Path,
8};
9
10use zip::{ZipArchive, result::ZipError};
11
12use crate::{
13 BeamModule, BeamSet, ContentHash, ExtractionLimits, Manifest, PackageError,
14 builder::is_safe_logical_name, content_hash, extraction::ExtractionBudget,
15 namespace::deployed_name, version::WorkflowVersion,
16};
17
18const MANIFEST_ENTRY: &str = "manifest.json";
19const BEAM_PREFIX: &str = "beam/";
20const BEAM_SUFFIX: &str = ".beam";
21const SOURCE_PREFIX: &str = "src/";
22const SOURCE_SUFFIX: &str = ".gleam";
23
24#[derive(Clone, Debug, PartialEq)]
30pub struct Package {
31 manifest: Manifest,
32 beams: BeamSet,
33 source: BTreeMap<String, Vec<u8>>,
34 content_hash: ContentHash,
35}
36
37impl Package {
38 pub fn load_from_path(
49 path: impl AsRef<Path>,
50 limits: ExtractionLimits,
51 ) -> Result<Self, PackageError> {
52 let file =
53 File::open(path).map_err(|source| PackageError::ArchiveRead(ZipError::Io(source)))?;
54 Self::load_from_reader(file, limits)
55 }
56
57 pub fn load_from_bytes(
68 bytes: impl AsRef<[u8]>,
69 limits: ExtractionLimits,
70 ) -> Result<Self, PackageError> {
71 Self::load_from_reader(Cursor::new(bytes.as_ref()), limits)
72 }
73
74 fn load_from_reader<R>(reader: R, limits: ExtractionLimits) -> Result<Self, PackageError>
75 where
76 R: Read + Seek,
77 {
78 let mut archive = ZipArchive::new(reader).map_err(PackageError::ArchiveRead)?;
79 let mut budget = limits.budget();
80 let manifest = read_manifest(&mut archive, &mut budget)?;
81 manifest.check_format_version()?;
82
83 let (beams, source) = read_archive_entries(&mut archive, &mut budget)?;
84 let content_hash = content_hash(&beams);
85 let computed = content_hash.to_string();
86 if manifest.version.as_str() != computed {
87 return Err(PackageError::IntegrityMismatch {
88 expected: manifest.version.as_str().to_owned(),
89 computed,
90 });
91 }
92
93 if beams.get(&manifest.entry_module).is_none() {
94 return Err(PackageError::MissingEntryModule {
95 module: manifest.entry_module.clone(),
96 });
97 }
98
99 Ok(Self {
100 manifest,
101 beams,
102 source,
103 content_hash,
104 })
105 }
106
107 #[must_use]
109 pub const fn manifest(&self) -> &Manifest {
110 &self.manifest
111 }
112
113 #[must_use]
115 pub const fn beams(&self) -> &BeamSet {
116 &self.beams
117 }
118
119 #[must_use]
121 pub const fn source(&self) -> &BTreeMap<String, Vec<u8>> {
122 &self.source
123 }
124
125 #[must_use]
127 pub const fn content_hash(&self) -> &ContentHash {
128 &self.content_hash
129 }
130
131 #[must_use]
133 pub fn version_record(&self) -> WorkflowVersion {
134 WorkflowVersion {
135 entry_module: self.manifest.entry_module.clone(),
136 content_hash: self.content_hash.clone(),
137 activities: self.manifest.activities.clone(),
138 input_schema: self.manifest.input_schema.clone(),
139 output_schema: self.manifest.output_schema.clone(),
140 }
141 }
142
143 #[must_use]
148 pub fn deployed_modules(&self) -> Vec<(String, &[u8])> {
149 self.beams
150 .iter()
151 .map(|module| {
152 (
153 deployed_name(module.name(), &self.content_hash),
154 module.bytes(),
155 )
156 })
157 .collect()
158 }
159
160 #[must_use]
162 pub fn deployed_entry_module(&self) -> String {
163 deployed_name(&self.manifest.entry_module, &self.content_hash)
164 }
165
166 #[cfg(any(test, feature = "test-support"))]
167 #[doc(hidden)]
168 #[must_use]
169 pub fn from_validated_parts_for_test(
170 manifest: Manifest,
171 beams: BeamSet,
172 source: BTreeMap<String, Vec<u8>>,
173 content_hash: ContentHash,
174 ) -> Self {
175 Self {
176 manifest,
177 beams,
178 source,
179 content_hash,
180 }
181 }
182}
183
184fn read_manifest<R>(
185 archive: &mut ZipArchive<R>,
186 budget: &mut ExtractionBudget,
187) -> Result<Manifest, PackageError>
188where
189 R: Read + Seek,
190{
191 let mut manifest_file = match archive.by_name(MANIFEST_ENTRY) {
192 Ok(file) => file,
193 Err(ZipError::FileNotFound) => return Err(PackageError::MissingManifest),
194 Err(error) => return Err(PackageError::ArchiveRead(error)),
195 };
196
197 let manifest_bytes = budget.read_entry(&mut manifest_file)?;
198
199 serde_json::from_slice(&manifest_bytes).map_err(|source| PackageError::ManifestParse { source })
200}
201
202fn read_archive_entries<R>(
203 archive: &mut ZipArchive<R>,
204 budget: &mut ExtractionBudget,
205) -> Result<(BeamSet, BTreeMap<String, Vec<u8>>), PackageError>
206where
207 R: Read + Seek,
208{
209 let mut modules = Vec::new();
210 let mut source = BTreeMap::new();
211
212 for index in 0..archive.len() {
213 let mut file = archive.by_index(index).map_err(PackageError::ArchiveRead)?;
214 if file.is_dir() {
215 continue;
216 }
217
218 let entry = file.name().to_owned();
219 if entry == MANIFEST_ENTRY {
220 continue;
221 }
222
223 if entry.starts_with(BEAM_PREFIX) {
224 let logical = logical_name_from_entry(&entry, BEAM_PREFIX, BEAM_SUFFIX)?;
225 let bytes = budget.read_entry(&mut file)?;
226 modules.push(BeamModule::new(logical, bytes));
227 } else if entry.starts_with(SOURCE_PREFIX) {
228 let logical = logical_name_from_entry(&entry, SOURCE_PREFIX, SOURCE_SUFFIX)?;
229 let bytes = budget.read_entry(&mut file)?;
230 if source.insert(logical, bytes).is_some() {
231 return Err(PackageError::MalformedBeamEntry { entry });
232 }
233 }
234 }
235
236 let beams = BeamSet::new(modules)?;
237 Ok((beams, source))
238}
239
240fn logical_name_from_entry(
241 entry: &str,
242 prefix: &str,
243 suffix: &str,
244) -> Result<String, PackageError> {
245 let Some(without_prefix) = entry.strip_prefix(prefix) else {
246 return Err(PackageError::MalformedBeamEntry {
247 entry: entry.to_owned(),
248 });
249 };
250 let Some(logical) = without_prefix.strip_suffix(suffix) else {
251 return Err(PackageError::MalformedBeamEntry {
252 entry: entry.to_owned(),
253 });
254 };
255
256 if is_safe_logical_name(logical) {
257 Ok(logical.to_owned())
258 } else {
259 Err(PackageError::MalformedBeamEntry {
260 entry: entry.to_owned(),
261 })
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use std::{
268 collections::BTreeMap,
269 fs,
270 io::{Cursor, Write},
271 time::{Duration, SystemTime, UNIX_EPOCH},
272 };
273
274 use serde_json::json;
275 use zip::{CompressionMethod, ZipWriter, write::SimpleFileOptions};
276
277 use super::Package;
278 use crate::{
279 BeamModule, BeamSet, CURRENT_FORMAT_VERSION, DeclaredActivity, ExtractionLimits, Manifest,
280 ManifestVersion, PackageBuilder, PackageError, content_hash,
281 };
282
283 fn sample_manifest() -> Manifest {
284 Manifest {
285 entry_module: "workflow/order".to_owned(),
286 entry_function: "run".to_owned(),
287 input_schema: json!({ "type": "object" }),
288 output_schema: json!({ "type": "object" }),
289 timeout: Duration::from_secs(30),
290 activities: vec![DeclaredActivity {
291 activity_type: "charge_card".to_owned(),
292 }],
293 version: ManifestVersion::new("placeholder"),
294 format_version: CURRENT_FORMAT_VERSION,
295 }
296 }
297
298 fn sample_beams() -> Result<BeamSet, PackageError> {
299 BeamSet::new(vec![
300 BeamModule::new("workflow/support", vec![4, 5, 6]),
301 BeamModule::new("workflow/order", vec![1, 2, 3]),
302 ])
303 }
304
305 fn write_zip<I, N, B>(entries: I) -> Result<Vec<u8>, PackageError>
306 where
307 I: IntoIterator<Item = (N, B)>,
308 N: ToString,
309 B: AsRef<[u8]>,
310 {
311 let cursor = Cursor::new(Vec::new());
312 let mut archive = ZipWriter::new(cursor);
313 let options = SimpleFileOptions::default()
314 .compression_method(CompressionMethod::Stored)
315 .compression_level(None);
316
317 for (name, bytes) in entries {
318 archive
319 .start_file(name, options)
320 .map_err(PackageError::ArchiveWrite)?;
321 archive
322 .write_all(bytes.as_ref())
323 .map_err(|source| PackageError::ArchiveWriteIo { source })?;
324 }
325
326 let cursor = archive.finish().map_err(PackageError::ArchiveWrite)?;
327 Ok(cursor.into_inner())
328 }
329
330 fn archive_with_manifest(manifest: &Manifest) -> Result<Vec<u8>, PackageError> {
331 let manifest_bytes = serde_json::to_vec(manifest)
332 .map_err(|source| PackageError::ManifestSerialise { source })?;
333 write_zip([("manifest.json", manifest_bytes)])
334 }
335
336 fn deflated_zip<I, N, B>(entries: I) -> Result<Vec<u8>, PackageError>
340 where
341 I: IntoIterator<Item = (N, B)>,
342 N: ToString,
343 B: AsRef<[u8]>,
344 {
345 let cursor = Cursor::new(Vec::new());
346 let mut archive = ZipWriter::new(cursor);
347 let options = SimpleFileOptions::default().compression_method(CompressionMethod::Deflated);
348
349 for (name, bytes) in entries {
350 archive
351 .start_file(name, options)
352 .map_err(PackageError::ArchiveWrite)?;
353 archive
354 .write_all(bytes.as_ref())
355 .map_err(|source| PackageError::ArchiveWriteIo { source })?;
356 }
357
358 let cursor = archive.finish().map_err(PackageError::ArchiveWrite)?;
359 Ok(cursor.into_inner())
360 }
361
362 #[test]
366 fn inflate_bomb_past_bounded_budget_is_refused_reporting_the_limit()
367 -> Result<(), Box<dyn std::error::Error>> {
368 const BUDGET: u64 = 65_536;
369 let manifest_bytes = serde_json::to_vec(&sample_manifest())
370 .map_err(|source| PackageError::ManifestSerialise { source })?;
371 let bytes = deflated_zip([
372 ("manifest.json", manifest_bytes),
373 ("beam/workflow/order.beam", vec![0_u8; 4 * 1024 * 1024]),
374 ])?;
375 assert!(
376 u64::try_from(bytes.len())? < BUDGET,
377 "bomb must compress under the budget to model a sneaky upload: {} bytes",
378 bytes.len()
379 );
380
381 let result = Package::load_from_bytes(&bytes, ExtractionLimits::bounded(BUDGET));
382
383 assert!(matches!(
384 result,
385 Err(PackageError::InflatedSizeExceeded { limit: BUDGET })
386 ));
387 Ok(())
388 }
389
390 #[test]
393 fn package_on_exact_inflate_budget_loads_and_one_byte_under_refuses()
394 -> Result<(), Box<dyn std::error::Error>> {
395 let beams = sample_beams()?;
396 let mut manifest = sample_manifest();
397 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
398 let manifest_bytes = serde_json::to_vec(&manifest)?;
399 let entries = vec![
400 ("manifest.json".to_owned(), manifest_bytes.clone()),
401 ("beam/workflow/support.beam".to_owned(), vec![4, 5, 6]),
402 ("beam/workflow/order.beam".to_owned(), vec![1, 2, 3]),
403 ];
404 let inflated_total = u64::try_from(manifest_bytes.len())? + 6;
405 let bytes = write_zip(entries)?;
406
407 let loaded = Package::load_from_bytes(&bytes, ExtractionLimits::bounded(inflated_total))?;
408 assert_eq!(loaded.beams().len(), 2);
409
410 let result =
411 Package::load_from_bytes(&bytes, ExtractionLimits::bounded(inflated_total - 1));
412 assert!(matches!(
413 result,
414 Err(PackageError::InflatedSizeExceeded { limit }) if limit == inflated_total - 1
415 ));
416 Ok(())
417 }
418
419 #[test]
420 fn non_zip_input_returns_archive_read() {
421 let result = Package::load_from_bytes(b"not a zip archive", ExtractionLimits::unbounded());
422
423 assert!(matches!(result, Err(PackageError::ArchiveRead(_))));
424 }
425
426 #[test]
427 fn truncated_zip_input_returns_archive_read() -> Result<(), PackageError> {
428 let bytes = archive_with_manifest(&sample_manifest())?;
429 let truncated = &bytes[..bytes.len() / 2];
430 let result = Package::load_from_bytes(truncated, ExtractionLimits::unbounded());
431
432 assert!(matches!(result, Err(PackageError::ArchiveRead(_))));
433 Ok(())
434 }
435
436 #[test]
437 fn missing_manifest_returns_missing_manifest() -> Result<(), PackageError> {
438 let bytes = write_zip([("beam/workflow/order.beam", vec![1, 2, 3])])?;
439 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
440
441 assert!(matches!(result, Err(PackageError::MissingManifest)));
442 Ok(())
443 }
444
445 #[test]
446 fn unparseable_manifest_returns_manifest_parse() -> Result<(), PackageError> {
447 let bytes = write_zip([("manifest.json", b"not-json".to_vec())])?;
448 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
449
450 assert!(matches!(result, Err(PackageError::ManifestParse { .. })));
451 Ok(())
452 }
453
454 #[test]
455 fn unknown_format_version_returns_exact_variant() -> Result<(), PackageError> {
456 let mut manifest = sample_manifest();
457 manifest.format_version = CURRENT_FORMAT_VERSION + 99;
458 let bytes = archive_with_manifest(&manifest)?;
459 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
460
461 assert!(matches!(
462 result,
463 Err(PackageError::UnknownFormatVersion { found }) if found == CURRENT_FORMAT_VERSION + 99
464 ));
465 Ok(())
466 }
467
468 #[test]
469 fn malformed_beam_entry_returns_exact_variant() -> Result<(), PackageError> {
470 let beams = sample_beams()?;
471 let mut manifest = sample_manifest();
472 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
473 let manifest_bytes = serde_json::to_vec(&manifest)
474 .map_err(|source| PackageError::ManifestSerialise { source })?;
475 let bytes = write_zip([
476 ("manifest.json", manifest_bytes),
477 ("beam/workflow/order.txt", vec![1, 2, 3]),
478 ])?;
479 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
480
481 assert!(matches!(
482 result,
483 Err(PackageError::MalformedBeamEntry { entry }) if entry == "beam/workflow/order.txt"
484 ));
485 Ok(())
486 }
487
488 #[test]
489 fn beam_entry_with_deployed_name_separator_returns_malformed_entry() -> Result<(), PackageError>
490 {
491 let beams = sample_beams()?;
492 let mut manifest = sample_manifest();
493 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
494 let manifest_bytes = serde_json::to_vec(&manifest)
495 .map_err(|source| PackageError::ManifestSerialise { source })?;
496 let bytes = write_zip([
497 ("manifest.json", manifest_bytes),
498 ("beam/workflow/order$bad.beam", vec![1, 2, 3]),
499 ])?;
500 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
501
502 assert!(matches!(
503 result,
504 Err(PackageError::MalformedBeamEntry { entry }) if entry == "beam/workflow/order$bad.beam"
505 ));
506 Ok(())
507 }
508
509 #[test]
510 fn invalid_source_entry_returns_malformed_entry() -> Result<(), PackageError> {
511 let beams = sample_beams()?;
512 let mut manifest = sample_manifest();
513 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
514 let mut entries = vec![
515 (
516 "manifest.json".to_owned(),
517 serde_json::to_vec(&manifest)
518 .map_err(|source| PackageError::ManifestSerialise { source })?,
519 ),
520 ("src/workflow/order.txt".to_owned(), b"source".to_vec()),
521 ];
522 entries.extend(beams.iter().map(|module| {
523 (
524 format!("beam/{}.beam", module.name()),
525 module.bytes().to_vec(),
526 )
527 }));
528 let result = Package::load_from_bytes(write_zip(entries)?, ExtractionLimits::unbounded());
529
530 assert!(matches!(
531 result,
532 Err(PackageError::MalformedBeamEntry { entry }) if entry == "src/workflow/order.txt"
533 ));
534 Ok(())
535 }
536
537 #[test]
538 fn missing_entry_module_returns_exact_variant_when_hash_matches() -> Result<(), PackageError> {
539 let beams = BeamSet::new(vec![BeamModule::new("workflow/support", vec![4, 5, 6])])?;
540 let mut manifest = sample_manifest();
541 manifest.version = ManifestVersion::new(content_hash(&beams).to_string());
542 let manifest_bytes = serde_json::to_vec(&manifest)
543 .map_err(|source| PackageError::ManifestSerialise { source })?;
544 let bytes = write_zip([
545 ("manifest.json", manifest_bytes),
546 ("beam/workflow/support.beam", vec![4, 5, 6]),
547 ])?;
548 let result = Package::load_from_bytes(bytes, ExtractionLimits::unbounded());
549
550 assert!(matches!(
551 result,
552 Err(PackageError::MissingEntryModule { module }) if module == "workflow/order"
553 ));
554 Ok(())
555 }
556
557 #[test]
558 fn builder_produced_package_loads_successfully() -> Result<(), PackageError> {
559 let bytes = PackageBuilder::with_source(
560 sample_manifest(),
561 sample_beams()?,
562 BTreeMap::from([(
563 "workflow/order".to_owned(),
564 b"pub fn run() { Nil }".to_vec(),
565 )]),
566 )
567 .write_to_bytes()?;
568
569 let package = Package::load_from_bytes(bytes, ExtractionLimits::unbounded())?;
570
571 assert_eq!(package.manifest().entry_module, "workflow/order");
572 assert_eq!(package.beams().len(), 2);
573 assert_eq!(
574 package.source().get("workflow/order"),
575 Some(&b"pub fn run() { Nil }".to_vec())
576 );
577 assert_eq!(
578 package.content_hash().to_string(),
579 package.manifest().version.as_str()
580 );
581 Ok(())
582 }
583
584 #[test]
585 fn load_from_path_loads_successfully() -> Result<(), Box<dyn std::error::Error>> {
586 let bytes = PackageBuilder::new(sample_manifest(), sample_beams()?).write_to_bytes()?;
587 let nanos = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
588 let path = std::env::temp_dir().join(format!("aion-package-{nanos}.aion"));
589 fs::write(&path, bytes)?;
590
591 let package_result = Package::load_from_path(&path, ExtractionLimits::unbounded());
592 let remove_result = fs::remove_file(&path);
593
594 let package = package_result?;
595 remove_result?;
596 assert_eq!(package.manifest().entry_module, "workflow/order");
597 Ok(())
598 }
599}