1use std::collections::BTreeMap;
2use std::io::Cursor;
3
4use arrow_array::{Array, BinaryArray};
5use image::ImageFormat;
6
7use hirn_core::metadata::Metadata;
8use hirn_core::types::{AgentId, Namespace};
9use hirn_core::{
10 DerivedArtifact, DerivedArtifactKind, EvidenceRole, HydrationMode, LogicalResourceId,
11 ModalityProfile, ResourceGovernanceState, ResourceId, ResourceLocation, ResourceObject,
12 ResourceQuotaPolicy, ResourceQuotaScope, ResourceRetentionAction, ResourceRetentionPolicy,
13 ResourceRevisionId, RevisionOperation, Timestamp,
14};
15
16use crate::HirnDbError;
17use crate::datasets::{derived_artifact as artifact_ds, resource_blob as blob_ds, resource_object};
18use crate::mutation_envelope_ops::{
19 MutationEnvelopeRecord, MutationEnvelopeState, list_pending_mutation_envelopes,
20 update_mutation_envelope_state,
21};
22use crate::store::{PhysicalStore, ScanOptions};
23
24pub const RESOURCE_HEAD_TRANSITION_KIND: &str = "resource_head_transition";
25
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27struct ResourceHeadTransitionEnvelope {
28 current_id: ResourceId,
29 successor_id: ResourceId,
30 successor_created_at_ms: i64,
31}
32
33#[derive(Debug, Clone, Default, PartialEq)]
35pub struct ResourceSupersession {
36 pub reason: Option<String>,
37 pub modality: Option<ModalityProfile>,
38 pub mime_type: Option<String>,
39 pub display_name: Option<String>,
40 pub checksum: Option<String>,
41 pub size_bytes: Option<u64>,
42 pub location: Option<ResourceLocation>,
43 pub metadata: Option<Metadata>,
44}
45
46#[derive(Debug, Clone, Default, PartialEq)]
48pub struct ResourceGovernanceUpdate {
49 pub reason: Option<String>,
50 pub placeholder_display_name: Option<String>,
51}
52
53#[derive(Debug, Clone, PartialEq)]
55pub struct HydratedResource {
56 pub resource: ResourceObject,
57 pub artifacts: Vec<DerivedArtifact>,
58 pub blob: Option<Vec<u8>>,
59}
60
61#[derive(Debug, Clone, Default, PartialEq, Eq)]
63pub struct ResourceRetentionApplyResult {
64 pub scanned_active_heads: usize,
65 pub governed_resources: usize,
66 pub redacted_resources: usize,
67 pub purged_resources: usize,
68 pub skipped_resources: usize,
69}
70
71pub async fn persist_resource(
76 store: &dyn PhysicalStore,
77 resource: ResourceObject,
78 blob: Option<Vec<u8>>,
79) -> Result<ResourceObject, HirnDbError> {
80 persist_resource_inner(store, resource, blob, None).await
81}
82
83pub async fn persist_resource_with_quota_policy(
85 store: &dyn PhysicalStore,
86 resource: ResourceObject,
87 blob: Option<Vec<u8>>,
88 quota_policy: &ResourceQuotaPolicy,
89) -> Result<ResourceObject, HirnDbError> {
90 persist_resource_inner(store, resource, blob, Some(quota_policy)).await
91}
92
93pub fn build_configured_blob_resource<F>(
95 namespace: Namespace,
96 owner_agent_id: AgentId,
97 modality: ModalityProfile,
98 mime_type: Option<&str>,
99 data: &[u8],
100 configure: F,
101) -> Result<ResourceObject, HirnDbError>
102where
103 F: FnOnce(
104 hirn_core::resource::ResourceObjectBuilder,
105 ) -> hirn_core::resource::ResourceObjectBuilder,
106{
107 let checksum = format!("blake3:{}", blake3::hash(data).to_hex());
108 let mut builder = ResourceObject::builder()
109 .modality(modality)
110 .checksum(checksum)
111 .size_bytes(data.len() as u64)
112 .location(ResourceLocation::Blob { blob_index: 0 })
113 .owner_agent_id(owner_agent_id)
114 .namespace(namespace);
115 if let Some(mime_type) = mime_type {
116 builder = builder.mime_type(mime_type);
117 }
118
119 configure(builder)
120 .build()
121 .map_err(|error| HirnDbError::InvalidArgument(error.to_string()))
122}
123
124pub fn configure_audio_resource_builder(
126 builder: hirn_core::resource::ResourceObjectBuilder,
127 duration_ms: u64,
128 channel_count: Option<u16>,
129) -> hirn_core::resource::ResourceObjectBuilder {
130 let mut builder = builder.metadata_entry(
131 "duration_ms",
132 i64::try_from(duration_ms).unwrap_or(i64::MAX),
133 );
134 if let Some(channel_count) = channel_count {
135 builder = builder.metadata_entry("channel_count", i64::from(channel_count));
136 }
137 builder
138}
139
140async fn persist_resource_inner(
141 store: &dyn PhysicalStore,
142 resource: ResourceObject,
143 blob: Option<Vec<u8>>,
144 quota_policy: Option<&ResourceQuotaPolicy>,
145) -> Result<ResourceObject, HirnDbError> {
146 let mut resource = resource;
147 let blob = prepare_blob_payload(store, None, &mut resource, blob).await?;
148
149 if let Some(checksum) = resource.checksum.as_deref()
150 && let Some(existing) =
151 find_live_resource_by_checksum(store, resource.namespace, checksum).await?
152 {
153 return Ok(existing);
154 }
155
156 if let Some(quota_policy) = quota_policy {
157 enforce_resource_quota_policy(store, &resource, None, quota_policy).await?;
158 }
159
160 append_resource_revision(store, &resource, blob).await?;
161
162 Ok(resource)
163}
164
165pub async fn list_resource_revisions(
167 store: &dyn PhysicalStore,
168 resource_id: ResourceId,
169) -> Result<Vec<ResourceObject>, HirnDbError> {
170 let Some(resource) = get_resource_raw(store, resource_id).await? else {
171 return Ok(Vec::new());
172 };
173
174 list_resource_revisions_for_logical_id(store, resource.logical_resource_id).await
175}
176
177pub async fn get_resource_head(
179 store: &dyn PhysicalStore,
180 resource_id: ResourceId,
181) -> Result<Option<ResourceObject>, HirnDbError> {
182 let revisions = list_resource_revisions(store, resource_id).await?;
183 Ok(select_active_resource_head(&revisions))
184}
185
186pub async fn supersede_resource(
188 store: &dyn PhysicalStore,
189 resource_id: ResourceId,
190 supersession: ResourceSupersession,
191 blob: Option<Vec<u8>>,
192) -> Result<ResourceObject, HirnDbError> {
193 supersede_resource_inner(store, resource_id, supersession, blob, None).await
194}
195
196pub async fn supersede_resource_with_quota_policy(
198 store: &dyn PhysicalStore,
199 resource_id: ResourceId,
200 supersession: ResourceSupersession,
201 blob: Option<Vec<u8>>,
202 quota_policy: &ResourceQuotaPolicy,
203) -> Result<ResourceObject, HirnDbError> {
204 supersede_resource_inner(store, resource_id, supersession, blob, Some(quota_policy)).await
205}
206
207async fn supersede_resource_inner(
208 store: &dyn PhysicalStore,
209 resource_id: ResourceId,
210 supersession: ResourceSupersession,
211 blob: Option<Vec<u8>>,
212 quota_policy: Option<&ResourceQuotaPolicy>,
213) -> Result<ResourceObject, HirnDbError> {
214 let Some(current) = get_resource_head(store, resource_id).await? else {
215 return Err(HirnDbError::InvalidArgument(format!(
216 "resource not found: {resource_id}"
217 )));
218 };
219
220 let now = Timestamp::now();
221 let mut successor = build_successor_revision(
222 ¤t,
223 normalize_optional_string(supersession.reason),
224 now,
225 );
226
227 if let Some(modality) = supersession.modality {
228 successor.modality = modality;
229 }
230 if let Some(mime_type) = supersession.mime_type {
231 successor.mime_type = Some(mime_type);
232 }
233 if let Some(display_name) = supersession.display_name {
234 successor.display_name = Some(display_name);
235 }
236 if let Some(checksum) = supersession.checksum {
237 successor.checksum = Some(checksum);
238 }
239 if let Some(size_bytes) = supersession.size_bytes {
240 successor.size_bytes = size_bytes;
241 }
242 if let Some(location) = supersession.location {
243 successor.location = location;
244 }
245 if let Some(metadata) = supersession.metadata {
246 successor.metadata = metadata;
247 }
248
249 let blob = prepare_blob_payload(store, Some(¤t), &mut successor, blob).await?;
250 if let Some(quota_policy) = quota_policy {
251 enforce_resource_quota_policy(store, &successor, Some(¤t), quota_policy).await?;
252 }
253 let envelope = build_resource_head_transition_envelope(¤t, &successor)?;
254 crate::mutation_envelope_ops::append_mutation_envelope(store, &envelope).await?;
255 if let Err(error) = append_resource_revision(store, &successor, blob).await {
256 let _ = mark_resource_head_transition_failed(store, &envelope.id, &error).await;
257 return Err(error);
258 }
259
260 let mut updated_current = current.clone();
261 updated_current.superseded_by = Some(successor.id);
262 updated_current.updated_at = now;
263 if let Err(error) = upsert_resource_revision(store, &updated_current).await {
264 rollback_resource_revision(store, &successor).await;
265 let _ = mark_resource_head_transition_failed(store, &envelope.id, &error).await;
266 return Err(error);
267 }
268
269 update_mutation_envelope_state(store, &envelope.id, MutationEnvelopeState::Applied, None)
270 .await?;
271
272 Ok(successor)
273}
274
275pub async fn reconcile_resource_head_mutations(
276 store: &dyn PhysicalStore,
277) -> Result<usize, HirnDbError> {
278 let envelopes =
279 list_pending_mutation_envelopes(store, Some(RESOURCE_HEAD_TRANSITION_KIND)).await?;
280 let mut reconciled = 0usize;
281
282 for envelope in envelopes {
283 match reconcile_single_resource_head_transition(store, &envelope).await {
284 Ok(true) => reconciled += 1,
285 Ok(false) => {}
286 Err(error) => {
287 let _ = mark_resource_head_transition_failed(store, &envelope.id, &error).await;
288 }
289 }
290 }
291
292 Ok(reconciled)
293}
294
295pub async fn reconcile_pending_resource_blob_staging(
306 store: &dyn PhysicalStore,
307) -> Result<usize, HirnDbError> {
308 let filter = "storage_ready = false".to_string();
312 let batches = store
313 .scan(
314 resource_object::DATASET_NAME,
315 ScanOptions {
316 filter: Some(filter),
317 exact_filter: None,
318 columns: None,
319 order_by: None,
320 limit: None,
321 offset: None,
322 },
323 )
324 .await?;
325
326 let mut staging_records: Vec<ResourceObject> = Vec::new();
327 for batch in &batches {
328 staging_records.extend(
329 resource_object::from_batch(batch)?
330 .into_iter()
331 .filter(|r| !r.storage_ready),
332 );
333 }
334
335 if staging_records.is_empty() {
336 return Ok(0);
337 }
338
339 let mut reconciled = 0_usize;
340 for mut resource in staging_records {
341 let blob_index = match resource.location {
342 ResourceLocation::Blob { blob_index } => blob_index,
343 _ => {
344 resource.storage_ready = true;
346 let _ = upsert_resource_revision(store, &resource).await;
347 reconciled += 1;
348 continue;
349 }
350 };
351
352 match load_resource_blob_unchecked(store, resource.id, blob_index).await {
354 Ok(_) => {
355 resource.storage_ready = true;
357 match upsert_resource_revision(store, &resource).await {
358 Ok(()) => {
359 tracing::debug!(
360 resource_id = %resource.id,
361 "reconciled staging resource: blob present, finalized"
362 );
363 reconciled += 1;
364 }
365 Err(error) => {
366 tracing::warn!(
367 resource_id = %resource.id,
368 %error,
369 "reconcile: failed to finalize staged resource"
370 );
371 }
372 }
373 }
374 Err(_) => {
375 let filter = format!("id = '{}'", resource.id);
378 if let Err(error) = store.delete(resource_object::DATASET_NAME, &filter).await {
379 tracing::warn!(
380 resource_id = %resource.id,
381 %error,
382 "reconcile: failed to delete dangling staging resource row"
383 );
384 } else {
385 tracing::debug!(
386 resource_id = %resource.id,
387 "reconcile: deleted dangling staging resource (no blob found)"
388 );
389 reconciled += 1;
390 }
391 }
392 }
393 }
394
395 Ok(reconciled)
396}
397
398fn build_resource_head_transition_envelope(
399 current: &ResourceObject,
400 successor: &ResourceObject,
401) -> Result<MutationEnvelopeRecord, HirnDbError> {
402 let payload = ResourceHeadTransitionEnvelope {
403 current_id: current.id,
404 successor_id: successor.id,
405 successor_created_at_ms: successor.created_at.timestamp_ms(),
406 };
407
408 let payload = serde_json::to_vec(&payload).map_err(|error| {
409 HirnDbError::InvalidArgument(format!("resource head envelope serialize: {error}"))
410 })?;
411
412 Ok(MutationEnvelopeRecord::pending(
413 format!("resource-head:{}", successor.id),
414 RESOURCE_HEAD_TRANSITION_KIND,
415 payload,
416 ))
417}
418
419async fn reconcile_single_resource_head_transition(
420 store: &dyn PhysicalStore,
421 envelope: &MutationEnvelopeRecord,
422) -> Result<bool, HirnDbError> {
423 let payload: ResourceHeadTransitionEnvelope = serde_json::from_slice(&envelope.payload)
424 .map_err(|error| {
425 HirnDbError::InvalidArgument(format!("resource head envelope deserialize: {error}"))
426 })?;
427
428 let current = get_resource_raw(store, payload.current_id).await?;
429 let successor = get_resource_raw(store, payload.successor_id).await?;
430
431 match (current, successor) {
432 (Some(current), Some(successor)) if current.superseded_by == Some(successor.id) => {
433 update_mutation_envelope_state(
434 store,
435 &envelope.id,
436 MutationEnvelopeState::Applied,
437 None,
438 )
439 .await?;
440 Ok(false)
441 }
442 (Some(mut current), Some(successor)) => {
443 current.superseded_by = Some(successor.id);
444 current.updated_at = Timestamp::from_millis(
445 u64::try_from(payload.successor_created_at_ms).map_err(|_| {
446 HirnDbError::InvalidArgument(
447 "resource head envelope successor_created_at_ms was negative".into(),
448 )
449 })?,
450 );
451 upsert_resource_revision(store, ¤t).await?;
452 update_mutation_envelope_state(
453 store,
454 &envelope.id,
455 MutationEnvelopeState::Applied,
456 None,
457 )
458 .await?;
459 Ok(true)
460 }
461 (Some(mut current), None) => {
462 if current.superseded_by == Some(payload.successor_id) {
463 current.superseded_by = None;
464 upsert_resource_revision(store, ¤t).await?;
465 }
466 mark_resource_head_transition_failed(
467 store,
468 &envelope.id,
469 &HirnDbError::InvalidArgument(format!(
470 "resource head recovery missing successor revision: {}",
471 payload.successor_id
472 )),
473 )
474 .await?;
475 Ok(true)
476 }
477 (None, Some(successor)) => {
478 rollback_resource_revision(store, &successor).await;
479 mark_resource_head_transition_failed(
480 store,
481 &envelope.id,
482 &HirnDbError::InvalidArgument(format!(
483 "resource head recovery missing current revision: {}",
484 payload.current_id
485 )),
486 )
487 .await?;
488 Ok(true)
489 }
490 (None, None) => {
491 mark_resource_head_transition_failed(
492 store,
493 &envelope.id,
494 &HirnDbError::InvalidArgument(format!(
495 "resource head recovery missing both revisions: {} -> {}",
496 payload.current_id, payload.successor_id
497 )),
498 )
499 .await?;
500 Ok(false)
501 }
502 }
503}
504
505async fn mark_resource_head_transition_failed(
506 store: &dyn PhysicalStore,
507 envelope_id: &str,
508 error: &HirnDbError,
509) -> Result<(), HirnDbError> {
510 update_mutation_envelope_state(
511 store,
512 envelope_id,
513 MutationEnvelopeState::Failed,
514 Some(error.to_string()),
515 )
516 .await
517}
518
519pub async fn redact_resource(
521 store: &dyn PhysicalStore,
522 resource_id: ResourceId,
523 update: ResourceGovernanceUpdate,
524) -> Result<ResourceObject, HirnDbError> {
525 govern_resource(
526 store,
527 resource_id,
528 ResourceGovernanceState::Redacted,
529 update,
530 )
531 .await
532}
533
534pub async fn purge_resource(
536 store: &dyn PhysicalStore,
537 resource_id: ResourceId,
538 update: ResourceGovernanceUpdate,
539) -> Result<ResourceObject, HirnDbError> {
540 govern_resource(store, resource_id, ResourceGovernanceState::Purged, update).await
541}
542
543pub async fn apply_resource_retention_policy(
545 store: &dyn PhysicalStore,
546 policy: &ResourceRetentionPolicy,
547) -> Result<ResourceRetentionApplyResult, HirnDbError> {
548 if policy.is_empty() {
549 return Ok(ResourceRetentionApplyResult::default());
550 }
551
552 let mut result = ResourceRetentionApplyResult::default();
553 for resource in list_active_resource_heads(store).await? {
554 result.scanned_active_heads += 1;
555
556 let Some(action) = policy.strongest_action_for(&resource) else {
557 continue;
558 };
559
560 if resource.governance_state == governance_state_for_action(action)
561 || resource.governance_state == ResourceGovernanceState::Purged
562 {
563 result.skipped_resources += 1;
564 continue;
565 }
566
567 let update = ResourceGovernanceUpdate {
568 reason: Some(format!("retention policy {}", action.as_str())),
569 placeholder_display_name: None,
570 };
571 match action {
572 ResourceRetentionAction::Redact => {
573 redact_resource(store, resource.id, update).await?;
574 result.redacted_resources += 1;
575 }
576 ResourceRetentionAction::Purge => {
577 purge_resource(store, resource.id, update).await?;
578 result.purged_resources += 1;
579 }
580 }
581 result.governed_resources += 1;
582 }
583
584 Ok(result)
585}
586
587pub async fn persist_derived_artifact(
589 store: &dyn PhysicalStore,
590 artifact: DerivedArtifact,
591) -> Result<(), HirnDbError> {
592 let batch = artifact_ds::to_batch(std::slice::from_ref(&artifact))?;
593 store.append(artifact_ds::DATASET_NAME, batch).await
594}
595
596#[derive(Debug, Clone, Copy, Default)]
598pub struct DerivedArtifactInput<'a> {
599 pub text_content: &'a str,
600 pub blob_bytes: Option<&'a [u8]>,
601 pub mime_type: Option<&'a str>,
602}
603
604impl<'a> DerivedArtifactInput<'a> {
605 #[must_use]
606 pub const fn new(text_content: &'a str) -> Self {
607 Self {
608 text_content,
609 blob_bytes: None,
610 mime_type: None,
611 }
612 }
613
614 #[must_use]
615 pub const fn with_blob(mut self, blob_bytes: &'a [u8], mime_type: Option<&'a str>) -> Self {
616 self.blob_bytes = Some(blob_bytes);
617 self.mime_type = mime_type;
618 self
619 }
620}
621
622#[derive(Debug, Clone, Copy, PartialEq, Eq)]
623struct DefaultTextArtifactPlan {
624 kind: DerivedArtifactKind,
625 record_failure: bool,
626}
627
628impl DefaultTextArtifactPlan {
629 const fn new(kind: DerivedArtifactKind, record_failure: bool) -> Self {
630 Self {
631 kind,
632 record_failure,
633 }
634 }
635}
636
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
638struct DefaultBlobArtifactPlan {
639 kind: DerivedArtifactKind,
640 record_failure: bool,
641}
642
643impl DefaultBlobArtifactPlan {
644 const fn new(kind: DerivedArtifactKind, record_failure: bool) -> Self {
645 Self {
646 kind,
647 record_failure,
648 }
649 }
650}
651
652const IMAGE_SOURCE_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 2] = [
653 DefaultTextArtifactPlan::new(DerivedArtifactKind::Caption, true),
654 DefaultTextArtifactPlan::new(DerivedArtifactKind::OcrText, false),
655];
656const IMAGE_SOURCE_BLOB_ARTIFACTS: [DefaultBlobArtifactPlan; 1] = [DefaultBlobArtifactPlan::new(
657 DerivedArtifactKind::Thumbnail,
658 true,
659)];
660const AUDIO_SOURCE_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 1] = [DefaultTextArtifactPlan::new(
661 DerivedArtifactKind::Transcript,
662 true,
663)];
664const CODE_SOURCE_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 1] = [DefaultTextArtifactPlan::new(
665 DerivedArtifactKind::SyntaxSummary,
666 true,
667)];
668const STRUCTURED_SOURCE_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 1] =
669 [DefaultTextArtifactPlan::new(
670 DerivedArtifactKind::SchemaSummary,
671 true,
672 )];
673const PREVIEW_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 1] = [DefaultTextArtifactPlan::new(
674 DerivedArtifactKind::Preview,
675 true,
676)];
677const NO_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 0] = [];
678const NO_BLOB_ARTIFACTS: [DefaultBlobArtifactPlan; 0] = [];
679const THUMBNAIL_MAX_DIMENSION_PX: u32 = 256;
680
681pub async fn persist_default_derived_artifacts(
683 store: &dyn PhysicalStore,
684 resource: &ResourceObject,
685 role: EvidenceRole,
686 input: DerivedArtifactInput<'_>,
687) -> Result<Vec<DerivedArtifact>, HirnDbError> {
688 let text_plans = default_text_artifact_plan(resource.modality, role);
689 let blob_plans = default_blob_artifact_plan(resource.modality, role);
690 if text_plans.is_empty() && blob_plans.is_empty() {
691 return Ok(Vec::new());
692 }
693
694 let mut known_artifacts = list_derived_artifacts(store, resource.id).await?;
695 let text_content = input.text_content.trim();
696 let mut created = Vec::new();
697
698 for plan in text_plans {
699 if known_artifacts
700 .iter()
701 .any(|artifact| artifact.kind == plan.kind)
702 || known_artifacts
703 .iter()
704 .any(|artifact| artifact_failure_matches(artifact, plan.kind))
705 {
706 continue;
707 }
708
709 let artifact = if text_content.is_empty() {
710 if !plan.record_failure {
711 continue;
712 }
713
714 build_generation_failure_artifact(resource, role, plan.kind, "source text was empty")?
715 } else {
716 let mut builder = DerivedArtifact::builder()
717 .resource_id(resource.id)
718 .kind(plan.kind)
719 .modality(ModalityProfile::Text)
720 .text_content(text_content)
721 .namespace(resource.namespace);
722
723 if resource.modality == ModalityProfile::Image
724 && role == EvidenceRole::Source
725 && plan.kind == DerivedArtifactKind::OcrText
726 {
727 builder = builder
728 .metadata_entry("generation_strategy", "text_surrogate_fallback")
729 .metadata_entry("fallback_source", "image_description");
730 }
731
732 builder
733 .build()
734 .map_err(|error| HirnDbError::InvalidArgument(error.to_string()))?
735 };
736
737 persist_derived_artifact(store, artifact.clone()).await?;
738 known_artifacts.push(artifact.clone());
739 created.push(artifact);
740 }
741
742 for plan in blob_plans {
743 if known_artifacts
744 .iter()
745 .any(|artifact| artifact.kind == plan.kind)
746 || known_artifacts
747 .iter()
748 .any(|artifact| artifact_failure_matches(artifact, plan.kind))
749 {
750 continue;
751 }
752
753 let artifact = match input.blob_bytes {
754 Some(blob_bytes) if !blob_bytes.is_empty() => {
755 match build_binary_derived_artifact(
756 resource,
757 plan.kind,
758 blob_bytes,
759 input.mime_type,
760 &known_artifacts,
761 ) {
762 Ok((artifact, blob_bytes)) => {
763 persist_derived_artifact_with_blob(store, artifact.clone(), blob_bytes)
764 .await?;
765 artifact
766 }
767 Err(error) if plan.record_failure => {
768 let failure =
769 build_generation_failure_artifact(resource, role, plan.kind, &error)?;
770 persist_derived_artifact(store, failure.clone()).await?;
771 failure
772 }
773 Err(_) => continue,
774 }
775 }
776 _ if plan.record_failure => {
777 let failure = build_generation_failure_artifact(
778 resource,
779 role,
780 plan.kind,
781 "source blob was unavailable",
782 )?;
783 persist_derived_artifact(store, failure.clone()).await?;
784 failure
785 }
786 _ => continue,
787 };
788
789 known_artifacts.push(artifact.clone());
790 created.push(artifact);
791 }
792
793 Ok(created)
794}
795
796#[must_use]
798pub const fn derived_artifact_evidence_role(kind: DerivedArtifactKind) -> EvidenceRole {
799 match kind {
800 DerivedArtifactKind::Preview | DerivedArtifactKind::Thumbnail => EvidenceRole::Preview,
801 DerivedArtifactKind::OcrText
802 | DerivedArtifactKind::Transcript
803 | DerivedArtifactKind::Caption
804 | DerivedArtifactKind::SyntaxSummary
805 | DerivedArtifactKind::SchemaSummary
806 | DerivedArtifactKind::GenerationFailure => EvidenceRole::Derived,
807 }
808}
809
810#[must_use]
812pub fn evidence_links_for_derived_artifacts(
813 artifacts: &[DerivedArtifact],
814 part_index: Option<u32>,
815) -> Vec<hirn_core::EvidenceLink> {
816 artifacts
817 .iter()
818 .filter(|artifact| artifact.kind != DerivedArtifactKind::GenerationFailure)
819 .map(|artifact| {
820 let mut link = hirn_core::EvidenceLink::new(
821 artifact.resource_id,
822 derived_artifact_evidence_role(artifact.kind),
823 )
824 .with_artifact(artifact.id)
825 .with_provenance(artifact.kind.evidence_provenance())
826 .with_description(artifact.kind.as_str());
827 if let Some(part_index) = part_index {
828 link = link.with_part_index(part_index);
829 }
830 link
831 })
832 .collect()
833}
834
835#[must_use]
837pub fn text_backed_resource_checksum(discriminator: &str, payload: &[u8]) -> String {
838 let mut hasher = blake3::Hasher::new();
839 hasher.update(discriminator.as_bytes());
840 hasher.update(&[0]);
841 hasher.update(payload);
842 format!("blake3:{}", hasher.finalize().to_hex())
843}
844
845fn artifact_failure_matches(
846 artifact: &DerivedArtifact,
847 intended_kind: DerivedArtifactKind,
848) -> bool {
849 artifact.kind == DerivedArtifactKind::GenerationFailure
850 && matches!(
851 artifact.metadata.get("intended_kind"),
852 Some(hirn_core::metadata::MetadataValue::String(value)) if value == intended_kind.as_str()
853 )
854}
855
856fn build_generation_failure_artifact(
857 resource: &ResourceObject,
858 role: EvidenceRole,
859 intended_kind: DerivedArtifactKind,
860 reason: &str,
861) -> Result<DerivedArtifact, HirnDbError> {
862 DerivedArtifact::builder()
863 .resource_id(resource.id)
864 .kind(DerivedArtifactKind::GenerationFailure)
865 .modality(ModalityProfile::Text)
866 .text_content(format!(
867 "{} generation failed: {reason}",
868 intended_kind.as_str()
869 ))
870 .metadata_entry("intended_kind", intended_kind.as_str().to_string())
871 .metadata_entry("failure_reason", reason.to_string())
872 .metadata_entry("source_role", role.as_str().to_string())
873 .namespace(resource.namespace)
874 .build()
875 .map_err(|error| HirnDbError::InvalidArgument(error.to_string()))
876}
877
878fn build_binary_derived_artifact(
879 resource: &ResourceObject,
880 kind: DerivedArtifactKind,
881 blob_bytes: &[u8],
882 mime_type: Option<&str>,
883 existing_artifacts: &[DerivedArtifact],
884) -> Result<(DerivedArtifact, Vec<u8>), String> {
885 match kind {
886 DerivedArtifactKind::Thumbnail => {
887 let (thumbnail_bytes, width, height) = generate_thumbnail_bytes(blob_bytes, mime_type)?;
888 let blob_index = next_derived_artifact_blob_index(existing_artifacts);
889 let mut builder = DerivedArtifact::builder()
890 .resource_id(resource.id)
891 .kind(DerivedArtifactKind::Thumbnail)
892 .modality(ModalityProfile::Image)
893 .mime_type("image/png")
894 .blob_index(blob_index)
895 .checksum(format!(
896 "blake3:{}",
897 blake3::hash(&thumbnail_bytes).to_hex()
898 ))
899 .metadata_entry("generation_strategy", "downscaled_source_image")
900 .metadata_entry("max_dimension_px", i64::from(THUMBNAIL_MAX_DIMENSION_PX))
901 .metadata_entry("width_px", i64::from(width))
902 .metadata_entry("height_px", i64::from(height))
903 .namespace(resource.namespace);
904 if let Some(mime_type) = mime_type {
905 builder = builder.metadata_entry("source_mime_type", mime_type.to_string());
906 }
907 let artifact = builder.build().map_err(|error| error.to_string())?;
908 Ok((artifact, thumbnail_bytes))
909 }
910 other => Err(format!(
911 "unsupported binary derived artifact kind: {}",
912 other.as_str()
913 )),
914 }
915}
916
917fn generate_thumbnail_bytes(
918 blob_bytes: &[u8],
919 mime_type: Option<&str>,
920) -> Result<(Vec<u8>, u32, u32), String> {
921 let image = if let Some(format) = mime_type.and_then(image_format_from_mime_type) {
922 image::load_from_memory_with_format(blob_bytes, format)
923 } else {
924 image::load_from_memory(blob_bytes)
925 }
926 .map_err(|error| format!("failed to decode image for thumbnail generation: {error}"))?;
927
928 let thumbnail = image.thumbnail(THUMBNAIL_MAX_DIMENSION_PX, THUMBNAIL_MAX_DIMENSION_PX);
929 let width = thumbnail.width();
930 let height = thumbnail.height();
931 let mut encoded = Cursor::new(Vec::new());
932 thumbnail
933 .write_to(&mut encoded, ImageFormat::Png)
934 .map_err(|error| format!("failed to encode thumbnail image: {error}"))?;
935 Ok((encoded.into_inner(), width, height))
936}
937
938fn image_format_from_mime_type(mime_type: &str) -> Option<ImageFormat> {
939 match mime_type {
940 "image/png" => Some(ImageFormat::Png),
941 "image/jpeg" | "image/jpg" => Some(ImageFormat::Jpeg),
942 "image/gif" => Some(ImageFormat::Gif),
943 "image/webp" => Some(ImageFormat::WebP),
944 "image/bmp" => Some(ImageFormat::Bmp),
945 "image/tiff" => Some(ImageFormat::Tiff),
946 _ => None,
947 }
948}
949
950const fn next_derived_artifact_blob_index(existing_artifacts: &[DerivedArtifact]) -> u32 {
951 let mut next = 1;
952 let mut idx = 0;
953 while idx < existing_artifacts.len() {
954 if let Some(blob_index) = existing_artifacts[idx].blob_index
955 && blob_index >= next
956 {
957 next = blob_index + 1;
958 }
959 idx += 1;
960 }
961 next
962}
963
964async fn persist_derived_artifact_with_blob(
965 store: &dyn PhysicalStore,
966 artifact: DerivedArtifact,
967 blob_bytes: Vec<u8>,
968) -> Result<(), HirnDbError> {
969 let blob_index = artifact.blob_index.ok_or_else(|| {
970 HirnDbError::InvalidArgument("blob-backed derived artifact requires blob_index".into())
971 })?;
972 let row = blob_ds::ResourceBlobRow {
973 resource_id: artifact.resource_id,
974 blob_index,
975 data: blob_bytes,
976 };
977 let batch = blob_ds::to_batch(std::slice::from_ref(&row))?;
978 store.append(blob_ds::DATASET_NAME, batch).await?;
979 if let Err(error) = persist_derived_artifact(store, artifact).await {
980 let filter = format!(
981 "resource_id = '{}' AND blob_index = {}",
982 row.resource_id, row.blob_index
983 );
984 let _ = store.delete(blob_ds::DATASET_NAME, &filter).await;
985 return Err(error);
986 }
987 Ok(())
988}
989
990pub async fn get_resource(
992 store: &dyn PhysicalStore,
993 resource_id: ResourceId,
994) -> Result<Option<ResourceObject>, HirnDbError> {
995 let Some(resource) = get_resource_raw(store, resource_id).await? else {
996 return Ok(None);
997 };
998
999 sanitize_resource_for_effective_head(store, resource)
1000 .await
1001 .map(Some)
1002}
1003
1004async fn get_resource_raw(
1005 store: &dyn PhysicalStore,
1006 resource_id: ResourceId,
1007) -> Result<Option<ResourceObject>, HirnDbError> {
1008 let filter = format!("id = '{}'", resource_id);
1009 let batches = store
1010 .scan(
1011 resource_object::DATASET_NAME,
1012 ScanOptions {
1013 filter: Some(filter),
1014 exact_filter: None,
1015 columns: None,
1016 order_by: None,
1017 limit: Some(1),
1018 offset: None,
1019 },
1020 )
1021 .await?;
1022
1023 for batch in &batches {
1024 let mut decoded = resource_object::from_batch(batch)?;
1025 if let Some(resource) = decoded.pop().filter(ResourceObject::is_storage_ready) {
1026 return Ok(Some(resource));
1027 }
1028 }
1029
1030 Ok(None)
1031}
1032
1033pub async fn load_resource_blob(
1035 store: &dyn PhysicalStore,
1036 resource_id: ResourceId,
1037 blob_index: u32,
1038) -> Result<Vec<u8>, HirnDbError> {
1039 let Some(resource) = get_resource_raw(store, resource_id).await? else {
1040 return Err(HirnDbError::BlobError {
1041 dataset: resource_object::DATASET_NAME.to_string(),
1042 details: format!("resource not found or not visible: {resource_id}"),
1043 });
1044 };
1045
1046 if effective_head_for_logical_id(store, resource.logical_resource_id)
1047 .await?
1048 .is_some_and(|head| head.governance_state.hides_payload())
1049 {
1050 return Err(HirnDbError::BlobError {
1051 dataset: blob_ds::DATASET_NAME.to_string(),
1052 details: format!(
1053 "resource payload unavailable: {resource_id} is governed by the active head"
1054 ),
1055 });
1056 }
1057
1058 load_resource_blob_unchecked(store, resource_id, blob_index).await
1059}
1060
1061async fn load_resource_blob_unchecked(
1062 store: &dyn PhysicalStore,
1063 resource_id: ResourceId,
1064 blob_index: u32,
1065) -> Result<Vec<u8>, HirnDbError> {
1066 let filter = format!(
1067 "resource_id = '{}' AND blob_index = {}",
1068 resource_id, blob_index
1069 );
1070 let batches = store
1071 .scan(
1072 blob_ds::DATASET_NAME,
1073 ScanOptions {
1074 filter: Some(filter),
1075 exact_filter: None,
1076 columns: Some(vec!["data".to_string()]),
1077 order_by: None,
1078 limit: Some(1),
1079 offset: None,
1080 },
1081 )
1082 .await?;
1083
1084 let batch = batches.first().ok_or_else(|| HirnDbError::BlobError {
1085 dataset: blob_ds::DATASET_NAME.to_string(),
1086 details: format!("resource blob not found: {resource_id}:{blob_index}"),
1087 })?;
1088 if batch.num_rows() == 0 {
1089 return Err(HirnDbError::BlobError {
1090 dataset: blob_ds::DATASET_NAME.to_string(),
1091 details: format!("resource blob not found: {resource_id}:{blob_index}"),
1092 });
1093 }
1094
1095 let array = batch
1096 .column_by_name("data")
1097 .ok_or_else(|| HirnDbError::InvalidArgument("missing data column".into()))?
1098 .as_any()
1099 .downcast_ref::<BinaryArray>()
1100 .ok_or_else(|| {
1101 HirnDbError::InvalidArgument("resource blob data column wrong type".into())
1102 })?;
1103 if array.is_null(0) {
1104 return Err(HirnDbError::BlobError {
1105 dataset: blob_ds::DATASET_NAME.to_string(),
1106 details: format!("resource blob was null: {resource_id}:{blob_index}"),
1107 });
1108 }
1109
1110 Ok(array.value(0).to_vec())
1111}
1112
1113pub async fn list_derived_artifacts(
1115 store: &dyn PhysicalStore,
1116 resource_id: ResourceId,
1117) -> Result<Vec<DerivedArtifact>, HirnDbError> {
1118 let Some(resource) = get_resource_raw(store, resource_id).await? else {
1119 return Ok(Vec::new());
1120 };
1121
1122 if effective_head_for_logical_id(store, resource.logical_resource_id)
1123 .await?
1124 .is_some_and(|head| head.governance_state.hides_payload())
1125 {
1126 return Ok(Vec::new());
1127 }
1128
1129 let filter = format!("resource_id = '{}'", resource_id);
1130 let batches = store
1131 .scan(
1132 artifact_ds::DATASET_NAME,
1133 ScanOptions {
1134 filter: Some(filter),
1135 exact_filter: None,
1136 columns: None,
1137 order_by: None,
1138 limit: None,
1139 offset: None,
1140 },
1141 )
1142 .await?;
1143
1144 let mut decoded = Vec::new();
1145 for batch in &batches {
1146 decoded.extend(artifact_ds::from_batch(batch)?);
1147 }
1148 Ok(decoded)
1149}
1150
1151pub async fn fetch_resource(
1153 store: &dyn PhysicalStore,
1154 resource_id: ResourceId,
1155 hydration_mode: HydrationMode,
1156) -> Result<Option<HydratedResource>, HirnDbError> {
1157 let Some(resource) = get_resource(store, resource_id).await? else {
1158 return Ok(None);
1159 };
1160
1161 let artifacts = if matches!(hydration_mode, HydrationMode::Preview | HydrationMode::Full) {
1162 list_derived_artifacts(store, resource_id).await?
1163 } else {
1164 Vec::new()
1165 };
1166
1167 let blob = if matches!(hydration_mode, HydrationMode::Full) {
1168 match resource.location {
1169 ResourceLocation::Blob { blob_index } => {
1170 Some(load_resource_blob(store, resource_id, blob_index).await?)
1171 }
1172 ResourceLocation::Inline | ResourceLocation::External { .. } => None,
1173 }
1174 } else {
1175 None
1176 };
1177
1178 Ok(Some(HydratedResource {
1179 resource,
1180 artifacts,
1181 blob,
1182 }))
1183}
1184
1185async fn find_live_resource_by_checksum(
1186 store: &dyn PhysicalStore,
1187 namespace: Namespace,
1188 checksum: &str,
1189) -> Result<Option<ResourceObject>, HirnDbError> {
1190 let escaped_checksum = checksum.replace('\'', "''");
1191 let escaped_namespace = namespace.as_str().replace('\'', "''");
1192 let filter = format!(
1193 "checksum = '{}' AND namespace = '{}'",
1194 escaped_checksum, escaped_namespace
1195 );
1196
1197 let batches = store
1198 .scan(
1199 resource_object::DATASET_NAME,
1200 ScanOptions {
1201 filter: Some(filter),
1202 exact_filter: None,
1203 columns: None,
1204 order_by: None,
1205 limit: Some(1),
1206 offset: None,
1207 },
1208 )
1209 .await?;
1210
1211 let mut matches = Vec::new();
1212 for batch in &batches {
1213 matches.extend(
1214 resource_object::from_batch(batch)?
1215 .into_iter()
1216 .filter(ResourceObject::is_storage_ready),
1217 );
1218 }
1219
1220 Ok(select_live_resource_match(&matches))
1221}
1222
1223async fn list_resource_revisions_for_logical_id(
1224 store: &dyn PhysicalStore,
1225 logical_resource_id: LogicalResourceId,
1226) -> Result<Vec<ResourceObject>, HirnDbError> {
1227 let escaped_logical_id = logical_resource_id.to_string().replace('\'', "''");
1228 let filter = format!("logical_resource_id = '{}'", escaped_logical_id);
1229 let batches = store
1230 .scan(
1231 resource_object::DATASET_NAME,
1232 ScanOptions {
1233 filter: Some(filter),
1234 exact_filter: None,
1235 columns: None,
1236 order_by: None,
1237 limit: None,
1238 offset: None,
1239 },
1240 )
1241 .await?;
1242
1243 let mut revisions = Vec::new();
1244 for batch in &batches {
1245 revisions.extend(
1246 resource_object::from_batch(batch)?
1247 .into_iter()
1248 .filter(ResourceObject::is_storage_ready),
1249 );
1250 }
1251 revisions.sort_by(|left, right| {
1252 left.version
1253 .cmp(&right.version)
1254 .then_with(|| left.created_at.millis().cmp(&right.created_at.millis()))
1255 });
1256 Ok(revisions)
1257}
1258
1259async fn effective_head_for_logical_id(
1260 store: &dyn PhysicalStore,
1261 logical_resource_id: LogicalResourceId,
1262) -> Result<Option<ResourceObject>, HirnDbError> {
1263 let revisions = list_resource_revisions_for_logical_id(store, logical_resource_id).await?;
1264 Ok(select_active_resource_head(&revisions))
1265}
1266
1267async fn sanitize_resource_for_effective_head(
1268 store: &dyn PhysicalStore,
1269 resource: ResourceObject,
1270) -> Result<ResourceObject, HirnDbError> {
1271 let head = effective_head_for_logical_id(store, resource.logical_resource_id).await?;
1272 Ok(apply_effective_head_governance(resource, head.as_ref()))
1273}
1274
1275fn apply_effective_head_governance(
1276 mut resource: ResourceObject,
1277 head: Option<&ResourceObject>,
1278) -> ResourceObject {
1279 let Some(head) = head.filter(|head| head.governance_state.hides_payload()) else {
1280 return resource;
1281 };
1282
1283 resource.governance_state = head.governance_state;
1284 resource.governance_reason = head.governance_reason.clone();
1285 resource.governed_at = head.governed_at;
1286 resource.location = ResourceLocation::Inline;
1287 resource.checksum = None;
1288 resource.size_bytes = 0;
1289 resource.mime_type = None;
1290 resource.display_name = Some(
1291 head.display_name
1292 .clone()
1293 .filter(|name| !name.trim().is_empty())
1294 .unwrap_or_else(|| head.governance_state.placeholder_display_name().to_string()),
1295 );
1296 resource
1297}
1298
1299fn select_active_resource_head(revisions: &[ResourceObject]) -> Option<ResourceObject> {
1300 revisions
1301 .iter()
1302 .filter(|resource| resource.is_storage_ready() && resource.superseded_by.is_none())
1303 .max_by_key(|resource| resource.version)
1304 .cloned()
1305 .or_else(|| {
1306 revisions
1307 .iter()
1308 .filter(|resource| resource.is_storage_ready())
1309 .max_by_key(|resource| resource.version)
1310 .cloned()
1311 })
1312}
1313
1314fn select_live_resource_match(revisions: &[ResourceObject]) -> Option<ResourceObject> {
1315 revisions
1316 .iter()
1317 .filter(|resource| resource.is_storage_ready() && resource.superseded_by.is_none())
1318 .max_by_key(|resource| resource.version)
1319 .cloned()
1320}
1321
1322fn build_successor_revision(
1323 current: &ResourceObject,
1324 reason: Option<String>,
1325 now: Timestamp,
1326) -> ResourceObject {
1327 let mut successor = current.clone();
1328 let successor_id = ResourceId::new();
1329 successor.id = successor_id;
1330 successor.logical_resource_id = current.logical_resource_id;
1331 successor.revision_id = ResourceRevisionId::from_resource_id(successor_id);
1332 successor.version = current.version + 1;
1333 successor.revision_operation = RevisionOperation::Supersede;
1334 successor.revision_reason = reason;
1335 successor.revision_causation_id = Some(current.id);
1336 successor.superseded_by = None;
1337 successor.created_at = now;
1338 successor.updated_at = now;
1339 successor
1340}
1341
1342async fn govern_resource(
1343 store: &dyn PhysicalStore,
1344 resource_id: ResourceId,
1345 state: ResourceGovernanceState,
1346 update: ResourceGovernanceUpdate,
1347) -> Result<ResourceObject, HirnDbError> {
1348 let Some(current) = get_resource_head(store, resource_id).await? else {
1349 return Err(HirnDbError::InvalidArgument(format!(
1350 "resource not found: {resource_id}"
1351 )));
1352 };
1353 if current.governance_state == state {
1354 return Ok(current);
1355 }
1356
1357 let now = Timestamp::now();
1358 let reason = normalize_optional_string(update.reason)
1359 .or_else(|| Some(format!("resource {}", state.as_str())));
1360 let mut successor = build_successor_revision(¤t, reason.clone(), now);
1361 successor.governance_state = state;
1362 successor.governance_reason = reason;
1363 successor.governed_at = Some(now);
1364 successor.display_name = Some(
1365 normalize_optional_string(update.placeholder_display_name)
1366 .unwrap_or_else(|| state.placeholder_display_name().to_string()),
1367 );
1368 successor.mime_type = None;
1369 successor.checksum = None;
1370 successor.size_bytes = 0;
1371 successor.location = ResourceLocation::Inline;
1372
1373 append_resource_revision(store, &successor, None).await?;
1374
1375 let mut updated_current = current.clone();
1376 updated_current.superseded_by = Some(successor.id);
1377 updated_current.updated_at = now;
1378 if let Err(error) = upsert_resource_revision(store, &updated_current).await {
1379 rollback_resource_revision(store, &successor).await;
1380 return Err(error);
1381 }
1382
1383 let _ = delete_lineage_payloads_and_artifacts(store, current.logical_resource_id).await;
1384
1385 Ok(successor)
1386}
1387
1388async fn list_active_resource_heads(
1389 store: &dyn PhysicalStore,
1390) -> Result<Vec<ResourceObject>, HirnDbError> {
1391 let batches = store
1392 .scan(resource_object::DATASET_NAME, ScanOptions::default())
1393 .await?;
1394 let mut grouped: BTreeMap<LogicalResourceId, Vec<ResourceObject>> = BTreeMap::new();
1395 for batch in &batches {
1396 for resource in resource_object::from_batch(batch)? {
1397 if !resource.is_storage_ready() {
1398 continue;
1399 }
1400 grouped
1401 .entry(resource.logical_resource_id)
1402 .or_default()
1403 .push(resource);
1404 }
1405 }
1406
1407 Ok(grouped
1408 .into_values()
1409 .filter_map(|revisions| select_active_resource_head(&revisions))
1410 .collect())
1411}
1412
1413const fn governance_state_for_action(action: ResourceRetentionAction) -> ResourceGovernanceState {
1414 match action {
1415 ResourceRetentionAction::Redact => ResourceGovernanceState::Redacted,
1416 ResourceRetentionAction::Purge => ResourceGovernanceState::Purged,
1417 }
1418}
1419
1420async fn delete_lineage_payloads_and_artifacts(
1421 store: &dyn PhysicalStore,
1422 logical_resource_id: LogicalResourceId,
1423) -> Result<(), HirnDbError> {
1424 let resource_ids = list_resource_revisions_for_logical_id(store, logical_resource_id)
1425 .await?
1426 .into_iter()
1427 .map(|resource| resource.id)
1428 .collect::<Vec<_>>();
1429 if resource_ids.is_empty() {
1430 return Ok(());
1431 }
1432
1433 delete_rows_for_resource_ids(store, blob_ds::DATASET_NAME, "resource_id", &resource_ids)
1434 .await?;
1435 delete_rows_for_resource_ids(
1436 store,
1437 artifact_ds::DATASET_NAME,
1438 "resource_id",
1439 &resource_ids,
1440 )
1441 .await
1442}
1443
1444async fn delete_rows_for_resource_ids(
1445 store: &dyn PhysicalStore,
1446 dataset: &str,
1447 column: &str,
1448 resource_ids: &[ResourceId],
1449) -> Result<(), HirnDbError> {
1450 if resource_ids.is_empty() {
1451 return Ok(());
1452 }
1453
1454 let filter = resource_ids
1455 .iter()
1456 .map(|resource_id| {
1457 format!(
1458 "{column} = '{}'",
1459 resource_id.to_string().replace('\'', "''")
1460 )
1461 })
1462 .collect::<Vec<_>>()
1463 .join(" OR ");
1464 store.delete(dataset, &filter).await.map(|_| ())
1465}
1466
1467async fn append_resource_revision(
1468 store: &dyn PhysicalStore,
1469 resource: &ResourceObject,
1470 blob: Option<Vec<u8>>,
1471) -> Result<(), HirnDbError> {
1472 let mut staged_resource = resource.clone();
1473 let requires_finalize = matches!(
1474 (&staged_resource.location, blob.as_ref()),
1475 (ResourceLocation::Blob { .. }, Some(_))
1476 );
1477 if requires_finalize {
1478 staged_resource.storage_ready = false;
1479 }
1480
1481 let batch = resource_object::to_batch(std::slice::from_ref(&staged_resource))?;
1482 store.append(resource_object::DATASET_NAME, batch).await?;
1483
1484 if let (ResourceLocation::Blob { blob_index }, Some(blob_bytes)) =
1485 (&staged_resource.location, blob)
1486 {
1487 let row = blob_ds::ResourceBlobRow {
1488 resource_id: staged_resource.id,
1489 blob_index: *blob_index,
1490 data: blob_bytes,
1491 };
1492 let batch = blob_ds::to_batch(std::slice::from_ref(&row))?;
1493 if let Err(error) = store.append(blob_ds::DATASET_NAME, batch).await {
1494 rollback_resource_revision(store, &staged_resource).await;
1495 return Err(error);
1496 }
1497 }
1498
1499 if requires_finalize {
1500 staged_resource.storage_ready = true;
1501 if let Err(error) = upsert_resource_revision(store, &staged_resource).await {
1502 rollback_resource_revision(store, &staged_resource).await;
1503 return Err(error);
1504 }
1505 }
1506
1507 Ok(())
1508}
1509
1510async fn upsert_resource_revision(
1511 store: &dyn PhysicalStore,
1512 resource: &ResourceObject,
1513) -> Result<(), HirnDbError> {
1514 let batch = resource_object::to_batch(std::slice::from_ref(resource))?;
1515 store
1516 .merge_insert(resource_object::DATASET_NAME, &["id"], batch)
1517 .await
1518}
1519
1520async fn rollback_resource_revision(store: &dyn PhysicalStore, resource: &ResourceObject) {
1521 let resource_filter = format!("id = '{}'", resource.id);
1522 let _ = store
1523 .delete(resource_object::DATASET_NAME, &resource_filter)
1524 .await;
1525
1526 if matches!(resource.location, ResourceLocation::Blob { .. }) {
1527 let blob_filter = format!("resource_id = '{}'", resource.id);
1528 let _ = store.delete(blob_ds::DATASET_NAME, &blob_filter).await;
1529 }
1530}
1531
1532async fn prepare_blob_payload(
1533 store: &dyn PhysicalStore,
1534 current: Option<&ResourceObject>,
1535 resource: &mut ResourceObject,
1536 blob: Option<Vec<u8>>,
1537) -> Result<Option<Vec<u8>>, HirnDbError> {
1538 match (&resource.location, blob) {
1539 (ResourceLocation::Blob { .. }, Some(blob_bytes)) => {
1540 sync_blob_metadata(resource, &blob_bytes)?;
1541 Ok(Some(blob_bytes))
1542 }
1543 (ResourceLocation::Blob { .. }, None) => {
1544 let Some(current) = current else {
1545 return Err(HirnDbError::InvalidArgument(
1546 "blob-backed resource requires payload bytes".into(),
1547 ));
1548 };
1549 let ResourceLocation::Blob { blob_index } = current.location else {
1550 return Err(HirnDbError::InvalidArgument(
1551 "cannot supersede a non-blob resource without new payload bytes".into(),
1552 ));
1553 };
1554 let blob_bytes = load_resource_blob_unchecked(store, current.id, blob_index).await?;
1555 sync_blob_metadata(resource, &blob_bytes)?;
1556 Ok(Some(blob_bytes))
1557 }
1558 (ResourceLocation::Inline | ResourceLocation::External { .. }, Some(_)) => {
1559 Err(HirnDbError::InvalidArgument(
1560 "only ResourceLocation::Blob may carry persisted payload bytes".into(),
1561 ))
1562 }
1563 (ResourceLocation::Inline | ResourceLocation::External { .. }, None) => Ok(None),
1564 }
1565}
1566
1567fn sync_blob_metadata(resource: &mut ResourceObject, blob: &[u8]) -> Result<(), HirnDbError> {
1568 if resource.checksum.is_none() {
1569 resource.checksum = Some(format!("blake3:{}", blake3::hash(blob).to_hex()));
1570 }
1571 resource.size_bytes = blob.len() as u64;
1572 Ok(())
1573}
1574
1575#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1576struct ResourceQuotaUsage {
1577 active_resources: usize,
1578 total_bytes: u64,
1579}
1580
1581async fn enforce_resource_quota_policy(
1582 store: &dyn PhysicalStore,
1583 new_head: &ResourceObject,
1584 replaced_head: Option<&ResourceObject>,
1585 quota_policy: &ResourceQuotaPolicy,
1586) -> Result<(), HirnDbError> {
1587 if quota_policy.is_empty() {
1588 return Ok(());
1589 }
1590
1591 let active_heads = list_active_resource_heads(store).await?;
1592 for rule in quota_policy.rules_for(new_head) {
1593 let usage = quota_usage_for_scope(&active_heads, rule.scope);
1594 let projected_active_resources =
1595 usage.active_resources + usize::from(replaced_head.is_none());
1596 if let Some(max_active_resources) = rule.max_active_resources
1597 && projected_active_resources > max_active_resources
1598 {
1599 return Err(HirnDbError::LimitExceeded(format!(
1600 "resource quota exceeded for {}: projected {} active resources exceeds limit {}",
1601 quota_scope_label(rule.scope),
1602 projected_active_resources,
1603 max_active_resources,
1604 )));
1605 }
1606
1607 let replaced_bytes = replaced_head.map_or(0, |head| head.size_bytes);
1608 let projected_total_bytes = usage
1609 .total_bytes
1610 .saturating_sub(replaced_bytes)
1611 .saturating_add(new_head.size_bytes);
1612 if let Some(max_total_bytes) = rule.max_total_bytes
1613 && projected_total_bytes > max_total_bytes
1614 {
1615 return Err(HirnDbError::LimitExceeded(format!(
1616 "resource quota exceeded for {}: projected {} bytes exceeds limit {}",
1617 quota_scope_label(rule.scope),
1618 projected_total_bytes,
1619 max_total_bytes,
1620 )));
1621 }
1622 }
1623
1624 Ok(())
1625}
1626
1627fn quota_usage_for_scope(
1628 active_heads: &[ResourceObject],
1629 scope: ResourceQuotaScope,
1630) -> ResourceQuotaUsage {
1631 active_heads
1632 .iter()
1633 .filter(|resource| scope.matches(resource))
1634 .fold(ResourceQuotaUsage::default(), |mut usage, resource| {
1635 usage.active_resources += 1;
1636 usage.total_bytes = usage.total_bytes.saturating_add(resource.size_bytes);
1637 usage
1638 })
1639}
1640
1641fn quota_scope_label(scope: ResourceQuotaScope) -> String {
1642 match scope {
1643 ResourceQuotaScope::Realm => "realm".to_string(),
1644 ResourceQuotaScope::Namespace(namespace) => format!("namespace `{}`", namespace.as_str()),
1645 ResourceQuotaScope::Agent(agent_id) => format!("agent `{}`", agent_id.as_str()),
1646 }
1647}
1648
1649const fn default_text_artifact_plan(
1650 modality: ModalityProfile,
1651 role: EvidenceRole,
1652) -> &'static [DefaultTextArtifactPlan] {
1653 match role {
1654 EvidenceRole::Source => match modality {
1655 ModalityProfile::Image => &IMAGE_SOURCE_TEXT_ARTIFACTS,
1656 ModalityProfile::Audio => &AUDIO_SOURCE_TEXT_ARTIFACTS,
1657 ModalityProfile::Code => &CODE_SOURCE_TEXT_ARTIFACTS,
1658 ModalityProfile::Structured => &STRUCTURED_SOURCE_TEXT_ARTIFACTS,
1659 ModalityProfile::Text => &NO_TEXT_ARTIFACTS,
1660 _ => &PREVIEW_TEXT_ARTIFACTS,
1661 },
1662 EvidenceRole::Attachment
1663 | EvidenceRole::Proof
1664 | EvidenceRole::Output
1665 | EvidenceRole::Preview
1666 | EvidenceRole::Derived => &PREVIEW_TEXT_ARTIFACTS,
1667 }
1668}
1669
1670const fn default_blob_artifact_plan(
1671 modality: ModalityProfile,
1672 role: EvidenceRole,
1673) -> &'static [DefaultBlobArtifactPlan] {
1674 match role {
1675 EvidenceRole::Source => match modality {
1676 ModalityProfile::Image => &IMAGE_SOURCE_BLOB_ARTIFACTS,
1677 _ => &NO_BLOB_ARTIFACTS,
1678 },
1679 EvidenceRole::Attachment
1680 | EvidenceRole::Proof
1681 | EvidenceRole::Output
1682 | EvidenceRole::Preview
1683 | EvidenceRole::Derived => &NO_BLOB_ARTIFACTS,
1684 }
1685}
1686
1687fn normalize_optional_string(value: Option<String>) -> Option<String> {
1688 value
1689 .map(|value| value.trim().to_string())
1690 .filter(|value| !value.is_empty())
1691}
1692
1693#[cfg(test)]
1694mod tests {
1695 use super::*;
1696 use std::sync::Arc;
1697
1698 use arrow_array::RecordBatch;
1699 use async_trait::async_trait;
1700 use datafusion::catalog::TableProvider;
1701 use hirn_core::{
1702 DerivedArtifact, DerivedArtifactKind, ModalityProfile, ResourceGovernanceState,
1703 ResourceLocation, ResourceObject, ResourceQuotaPolicy, ResourceQuotaRule,
1704 ResourceQuotaScope, ResourceRetentionAction, ResourceRetentionPolicy,
1705 ResourceRetentionRule, RevisionState,
1706 };
1707
1708 use crate::HirnDbError;
1709 use crate::datasets::{resource_blob as blob_ds, resource_object};
1710 use crate::memory_store::MemoryStore;
1711 use crate::mutation_envelope_ops::{MutationEnvelopeState, get_mutation_envelope};
1712 use crate::policy_store::{CURRENT_PRINCIPAL, NamespacePolicy, PolicyEnforcedStore};
1713 use crate::store::{
1714 ColumnTransform, CompactOptions, CompactResult, DatasetInfo, FtsSearchOptions,
1715 HybridSearchOptions, IndexConfig, MultivectorSearchOptions, PhysicalStore, ScanOptions,
1716 VectorSearchOptions, VersionTag,
1717 };
1718
1719 struct FaultInjectingStore {
1720 inner: MemoryStore,
1721 fail_blob_append: bool,
1722 fail_resource_merge_insert: bool,
1723 }
1724
1725 #[async_trait]
1726 impl PhysicalStore for FaultInjectingStore {
1727 async fn append(&self, dataset: &str, batch: RecordBatch) -> Result<(), HirnDbError> {
1728 if self.fail_blob_append && dataset == blob_ds::DATASET_NAME {
1729 return Err(HirnDbError::Unsupported(
1730 "simulated blob append failure".to_string(),
1731 ));
1732 }
1733 self.inner.append(dataset, batch).await
1734 }
1735
1736 async fn append_batches(
1737 &self,
1738 dataset: &str,
1739 batches: Vec<RecordBatch>,
1740 ) -> Result<(), HirnDbError> {
1741 for batch in batches {
1742 self.append(dataset, batch).await?;
1743 }
1744 Ok(())
1745 }
1746
1747 async fn scan(
1748 &self,
1749 dataset: &str,
1750 opts: ScanOptions,
1751 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1752 self.inner.scan(dataset, opts).await
1753 }
1754
1755 async fn scan_stream(
1756 &self,
1757 dataset: &str,
1758 opts: ScanOptions,
1759 ) -> Result<crate::store::RecordBatchStream, HirnDbError> {
1760 self.inner.scan_stream(dataset, opts).await
1761 }
1762
1763 async fn delete(&self, dataset: &str, predicate: &str) -> Result<u64, HirnDbError> {
1764 self.inner.delete(dataset, predicate).await
1765 }
1766
1767 async fn merge_insert(
1768 &self,
1769 dataset: &str,
1770 on: &[&str],
1771 batch: RecordBatch,
1772 ) -> Result<(), HirnDbError> {
1773 if self.fail_resource_merge_insert && dataset == resource_object::DATASET_NAME {
1774 return Err(HirnDbError::Unsupported(
1775 "simulated resource finalize failure".to_string(),
1776 ));
1777 }
1778 self.inner.merge_insert(dataset, on, batch).await
1779 }
1780
1781 async fn update_where(
1782 &self,
1783 dataset: &str,
1784 filter: &str,
1785 updates: &[(&str, &str)],
1786 ) -> Result<u64, HirnDbError> {
1787 self.inner.update_where(dataset, filter, updates).await
1788 }
1789
1790 async fn count(&self, dataset: &str, filter: Option<&str>) -> Result<u64, HirnDbError> {
1791 self.inner.count(dataset, filter).await
1792 }
1793
1794 async fn vector_search(
1795 &self,
1796 dataset: &str,
1797 opts: VectorSearchOptions,
1798 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1799 self.inner.vector_search(dataset, opts).await
1800 }
1801
1802 async fn vector_search_many(
1803 &self,
1804 dataset: &str,
1805 queries: Vec<VectorSearchOptions>,
1806 ) -> Result<Vec<Vec<RecordBatch>>, HirnDbError> {
1807 self.inner.vector_search_many(dataset, queries).await
1808 }
1809
1810 async fn fts_search(
1811 &self,
1812 dataset: &str,
1813 opts: FtsSearchOptions,
1814 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1815 self.inner.fts_search(dataset, opts).await
1816 }
1817
1818 async fn hybrid_search(
1819 &self,
1820 dataset: &str,
1821 opts: HybridSearchOptions,
1822 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1823 self.inner.hybrid_search(dataset, opts).await
1824 }
1825
1826 async fn multivector_search(
1827 &self,
1828 dataset: &str,
1829 opts: MultivectorSearchOptions,
1830 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1831 self.inner.multivector_search(dataset, opts).await
1832 }
1833
1834 async fn create_index(
1835 &self,
1836 dataset: &str,
1837 config: IndexConfig,
1838 ) -> Result<(), HirnDbError> {
1839 self.inner.create_index(dataset, config).await
1840 }
1841
1842 async fn optimize_indices(&self, dataset: &str) -> Result<(), HirnDbError> {
1843 self.inner.optimize_indices(dataset).await
1844 }
1845
1846 async fn compact(
1847 &self,
1848 dataset: &str,
1849 opts: CompactOptions,
1850 ) -> Result<CompactResult, HirnDbError> {
1851 self.inner.compact(dataset, opts).await
1852 }
1853
1854 async fn version(&self, dataset: &str) -> Result<u64, HirnDbError> {
1855 self.inner.version(dataset).await
1856 }
1857
1858 async fn tag(&self, dataset: &str, tag: &str) -> Result<(), HirnDbError> {
1859 self.inner.tag(dataset, tag).await
1860 }
1861
1862 async fn checkout(&self, dataset: &str, version: u64) -> Result<(), HirnDbError> {
1863 self.inner.checkout(dataset, version).await
1864 }
1865
1866 async fn list_tags(&self, dataset: &str) -> Result<Vec<VersionTag>, HirnDbError> {
1867 self.inner.list_tags(dataset).await
1868 }
1869
1870 async fn list_datasets(&self) -> Result<Vec<DatasetInfo>, HirnDbError> {
1871 self.inner.list_datasets().await
1872 }
1873
1874 async fn exists(&self, dataset: &str) -> Result<bool, HirnDbError> {
1875 self.inner.exists(dataset).await
1876 }
1877
1878 async fn list_namespaces(&self) -> Result<Vec<String>, HirnDbError> {
1879 self.inner.list_namespaces().await
1880 }
1881
1882 async fn create_namespace(&self, name: &str) -> Result<(), HirnDbError> {
1883 self.inner.create_namespace(name).await
1884 }
1885
1886 async fn drop_namespace(&self, name: &str) -> Result<(), HirnDbError> {
1887 self.inner.drop_namespace(name).await
1888 }
1889
1890 async fn add_columns(
1891 &self,
1892 dataset: &str,
1893 transforms: Vec<ColumnTransform>,
1894 ) -> Result<(), HirnDbError> {
1895 self.inner.add_columns(dataset, transforms).await
1896 }
1897
1898 async fn drop_columns(&self, dataset: &str, columns: &[&str]) -> Result<(), HirnDbError> {
1899 self.inner.drop_columns(dataset, columns).await
1900 }
1901
1902 async fn table_provider(&self, dataset: &str) -> Option<Arc<dyn TableProvider>> {
1903 self.inner.table_provider(dataset).await
1904 }
1905 }
1906
1907 #[tokio::test(flavor = "multi_thread")]
1908 async fn persist_resource_deduplicates_by_checksum_within_namespace() {
1909 let store = MemoryStore::new();
1910 let blob = vec![1_u8; 2048];
1911
1912 let first = ResourceObject::builder()
1913 .modality(ModalityProfile::Image)
1914 .mime_type("image/png")
1915 .checksum("blake3:dedup")
1916 .size_bytes(blob.len() as u64)
1917 .location(ResourceLocation::Blob { blob_index: 0 })
1918 .build()
1919 .unwrap();
1920 let second = ResourceObject::builder()
1921 .modality(ModalityProfile::Image)
1922 .mime_type("image/png")
1923 .checksum("blake3:dedup")
1924 .size_bytes(blob.len() as u64)
1925 .location(ResourceLocation::Blob { blob_index: 0 })
1926 .build()
1927 .unwrap();
1928
1929 let persisted_first = persist_resource(&store, first, Some(blob.clone()))
1930 .await
1931 .unwrap();
1932 let persisted_second = persist_resource(&store, second, Some(blob)).await.unwrap();
1933
1934 assert_eq!(persisted_first.id, persisted_second.id);
1935
1936 let resources = store
1937 .scan(resource_object::DATASET_NAME, ScanOptions::default())
1938 .await
1939 .unwrap();
1940 let blobs = store
1941 .scan(blob_ds::DATASET_NAME, ScanOptions::default())
1942 .await
1943 .unwrap();
1944
1945 assert_eq!(resources.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
1946 assert_eq!(blobs.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
1947 }
1948
1949 #[tokio::test(flavor = "multi_thread")]
1950 async fn persist_resource_rolls_back_when_blob_append_fails() {
1951 let store = FaultInjectingStore {
1952 inner: MemoryStore::new(),
1953 fail_blob_append: true,
1954 fail_resource_merge_insert: false,
1955 };
1956 let blob = vec![4_u8; 128];
1957 let resource = ResourceObject::builder()
1958 .modality(ModalityProfile::Document)
1959 .location(ResourceLocation::Blob { blob_index: 0 })
1960 .build()
1961 .unwrap();
1962 let resource_id = resource.id;
1963
1964 let error = persist_resource(&store, resource, Some(blob))
1965 .await
1966 .unwrap_err();
1967
1968 assert!(matches!(error, HirnDbError::Unsupported(_)));
1969 assert!(get_resource(&store, resource_id).await.unwrap().is_none());
1970 assert!(
1971 get_resource_head(&store, resource_id)
1972 .await
1973 .unwrap()
1974 .is_none()
1975 );
1976 assert_eq!(
1977 store
1978 .scan(resource_object::DATASET_NAME, ScanOptions::default())
1979 .await
1980 .unwrap()
1981 .iter()
1982 .map(|batch| batch.num_rows())
1983 .sum::<usize>(),
1984 0
1985 );
1986 assert_eq!(
1987 store
1988 .scan(blob_ds::DATASET_NAME, ScanOptions::default())
1989 .await
1990 .unwrap()
1991 .iter()
1992 .map(|batch| batch.num_rows())
1993 .sum::<usize>(),
1994 0
1995 );
1996 }
1997
1998 #[tokio::test(flavor = "multi_thread")]
1999 async fn persist_resource_rolls_back_when_visibility_finalize_fails() {
2000 let store = FaultInjectingStore {
2001 inner: MemoryStore::new(),
2002 fail_blob_append: false,
2003 fail_resource_merge_insert: true,
2004 };
2005 let blob = vec![6_u8; 128];
2006 let resource = ResourceObject::builder()
2007 .modality(ModalityProfile::Document)
2008 .location(ResourceLocation::Blob { blob_index: 0 })
2009 .build()
2010 .unwrap();
2011 let resource_id = resource.id;
2012
2013 let error = persist_resource(&store, resource, Some(blob))
2014 .await
2015 .unwrap_err();
2016
2017 assert!(matches!(error, HirnDbError::Unsupported(_)));
2018 assert!(get_resource(&store, resource_id).await.unwrap().is_none());
2019 assert!(
2020 get_resource_head(&store, resource_id)
2021 .await
2022 .unwrap()
2023 .is_none()
2024 );
2025 assert_eq!(
2026 store
2027 .scan(resource_object::DATASET_NAME, ScanOptions::default())
2028 .await
2029 .unwrap()
2030 .iter()
2031 .map(|batch| batch.num_rows())
2032 .sum::<usize>(),
2033 0
2034 );
2035 assert_eq!(
2036 store
2037 .scan(blob_ds::DATASET_NAME, ScanOptions::default())
2038 .await
2039 .unwrap()
2040 .iter()
2041 .map(|batch| batch.num_rows())
2042 .sum::<usize>(),
2043 0
2044 );
2045 }
2046
2047 #[tokio::test(flavor = "multi_thread")]
2048 async fn persist_resource_dedup_does_not_cross_namespaces() {
2049 let store = MemoryStore::new();
2050 let blob = vec![5_u8; 64];
2051
2052 let alpha = ResourceObject::builder()
2053 .modality(ModalityProfile::Document)
2054 .checksum("blake3:isolation")
2055 .size_bytes(blob.len() as u64)
2056 .namespace(Namespace::new("alpha").unwrap())
2057 .location(ResourceLocation::Blob { blob_index: 0 })
2058 .build()
2059 .unwrap();
2060 let beta = ResourceObject::builder()
2061 .modality(ModalityProfile::Document)
2062 .checksum("blake3:isolation")
2063 .size_bytes(blob.len() as u64)
2064 .namespace(Namespace::new("beta").unwrap())
2065 .location(ResourceLocation::Blob { blob_index: 0 })
2066 .build()
2067 .unwrap();
2068
2069 let alpha = persist_resource(&store, alpha, Some(blob.clone()))
2070 .await
2071 .unwrap();
2072 let beta = persist_resource(&store, beta, Some(blob)).await.unwrap();
2073
2074 assert_ne!(alpha.id, beta.id);
2075 }
2076
2077 #[tokio::test(flavor = "multi_thread")]
2078 async fn fetch_resource_respects_hydration_mode() {
2079 let store = MemoryStore::new();
2080 let blob = vec![9_u8; 512];
2081 let resource = ResourceObject::builder()
2082 .modality(ModalityProfile::Audio)
2083 .checksum("blake3:preview")
2084 .size_bytes(blob.len() as u64)
2085 .location(ResourceLocation::Blob { blob_index: 0 })
2086 .build()
2087 .unwrap();
2088 let resource = persist_resource(&store, resource, Some(blob.clone()))
2089 .await
2090 .unwrap();
2091
2092 let mut artifact = DerivedArtifact::builder()
2093 .resource_id(resource.id)
2094 .kind(DerivedArtifactKind::Transcript)
2095 .modality(ModalityProfile::Text)
2096 .text_content("preview transcript")
2097 .build()
2098 .unwrap();
2099 artifact.created_at = hirn_core::Timestamp::from_millis(artifact.created_at.millis());
2100 persist_derived_artifact(&store, artifact.clone())
2101 .await
2102 .unwrap();
2103
2104 let metadata_only = fetch_resource(&store, resource.id, HydrationMode::MetadataOnly)
2105 .await
2106 .unwrap()
2107 .unwrap();
2108 assert!(metadata_only.artifacts.is_empty());
2109 assert!(metadata_only.blob.is_none());
2110
2111 let preview = fetch_resource(&store, resource.id, HydrationMode::Preview)
2112 .await
2113 .unwrap()
2114 .unwrap();
2115 assert_eq!(preview.artifacts, vec![artifact.clone()]);
2116 assert!(preview.blob.is_none());
2117
2118 let full = fetch_resource(&store, resource.id, HydrationMode::Full)
2119 .await
2120 .unwrap()
2121 .unwrap();
2122 assert_eq!(full.artifacts, vec![artifact]);
2123 assert_eq!(full.blob, Some(blob));
2124 }
2125
2126 #[tokio::test(flavor = "multi_thread")]
2127 async fn persist_default_derived_artifacts_adds_caption_ocr_and_thumbnail_for_images() {
2128 let store = MemoryStore::new();
2129 let source_image = image::DynamicImage::new_rgba8(4, 4);
2130 let mut encoded = Cursor::new(Vec::new());
2131 source_image
2132 .write_to(&mut encoded, ImageFormat::Png)
2133 .unwrap();
2134 let blob = encoded.into_inner();
2135 let resource = ResourceObject::builder()
2136 .modality(ModalityProfile::Image)
2137 .mime_type("image/png")
2138 .location(ResourceLocation::Blob { blob_index: 0 })
2139 .build()
2140 .unwrap();
2141 let resource = persist_resource_with_quota_policy(
2142 &store,
2143 resource,
2144 Some(blob.clone()),
2145 &ResourceQuotaPolicy::default(),
2146 )
2147 .await
2148 .unwrap();
2149 let created = persist_default_derived_artifacts(
2150 &store,
2151 &resource,
2152 EvidenceRole::Source,
2153 DerivedArtifactInput::new("diagram of the auth handshake")
2154 .with_blob(&blob, Some("image/png")),
2155 )
2156 .await
2157 .unwrap();
2158 let links = evidence_links_for_derived_artifacts(&created, Some(0));
2159
2160 let artifacts = list_derived_artifacts(&store, resource.id).await.unwrap();
2161 assert_eq!(artifacts.len(), 3);
2162 assert_eq!(artifacts[0].kind, DerivedArtifactKind::Caption);
2163 assert_eq!(
2164 artifacts[0].text_content.as_deref(),
2165 Some("diagram of the auth handshake")
2166 );
2167 assert_eq!(artifacts[1].kind, DerivedArtifactKind::OcrText);
2168 assert_eq!(
2169 artifacts[1].text_content.as_deref(),
2170 Some("diagram of the auth handshake")
2171 );
2172 assert_eq!(artifacts[2].kind, DerivedArtifactKind::Thumbnail);
2173 assert_eq!(artifacts[2].mime_type.as_deref(), Some("image/png"));
2174 assert_eq!(artifacts[2].blob_index, Some(1));
2175 assert!(artifacts[2].text_content.is_none());
2176 assert!(matches!(
2177 artifacts[1].metadata.get("generation_strategy"),
2178 Some(hirn_core::metadata::MetadataValue::String(value)) if value == "text_surrogate_fallback"
2179 ));
2180 assert!(matches!(
2181 artifacts[1].metadata.get("fallback_source"),
2182 Some(hirn_core::metadata::MetadataValue::String(value)) if value == "image_description"
2183 ));
2184 assert_eq!(links.len(), 3);
2185 assert_eq!(links[0].role, EvidenceRole::Derived);
2186 assert_eq!(links[0].provenance.as_str(), "transformed_summary");
2187 assert_eq!(links[1].role, EvidenceRole::Derived);
2188 assert_eq!(links[1].provenance.as_str(), "generated_artifact");
2189 assert_eq!(links[2].role, EvidenceRole::Preview);
2190 assert_eq!(links[2].provenance.as_str(), "generated_artifact");
2191
2192 let thumbnail_blob = load_resource_blob(&store, resource.id, 1).await.unwrap();
2193 let thumbnail =
2194 image::load_from_memory_with_format(&thumbnail_blob, ImageFormat::Png).unwrap();
2195 assert!(thumbnail.width() <= THUMBNAIL_MAX_DIMENSION_PX);
2196 assert!(thumbnail.height() <= THUMBNAIL_MAX_DIMENSION_PX);
2197 }
2198
2199 #[tokio::test(flavor = "multi_thread")]
2200 async fn persist_default_derived_artifacts_records_generation_failure_for_empty_inputs() {
2201 let store = MemoryStore::new();
2202 let blob = vec![7_u8; 48];
2203 let resource = ResourceObject::builder()
2204 .modality(ModalityProfile::Image)
2205 .mime_type("image/png")
2206 .location(ResourceLocation::Blob { blob_index: 0 })
2207 .build()
2208 .unwrap();
2209 let resource = persist_resource_with_quota_policy(
2210 &store,
2211 resource,
2212 Some(blob.clone()),
2213 &ResourceQuotaPolicy::default(),
2214 )
2215 .await
2216 .unwrap();
2217 let created = persist_default_derived_artifacts(
2218 &store,
2219 &resource,
2220 EvidenceRole::Source,
2221 DerivedArtifactInput::new(""),
2222 )
2223 .await
2224 .unwrap();
2225
2226 let artifacts = list_derived_artifacts(&store, resource.id).await.unwrap();
2227 assert_eq!(artifacts.len(), 2);
2228 assert!(
2229 artifacts
2230 .iter()
2231 .all(|artifact| artifact.kind == DerivedArtifactKind::GenerationFailure)
2232 );
2233 assert!(artifacts.iter().any(|artifact| {
2234 artifact.text_content.as_deref()
2235 == Some("caption generation failed: source text was empty")
2236 }));
2237 assert!(artifacts.iter().any(|artifact| {
2238 artifact.text_content.as_deref()
2239 == Some("thumbnail generation failed: source blob was unavailable")
2240 }));
2241 assert!(evidence_links_for_derived_artifacts(&created, Some(0)).is_empty());
2242
2243 let hydrated = fetch_resource(&store, resource.id, HydrationMode::Full)
2244 .await
2245 .unwrap()
2246 .unwrap();
2247 assert_eq!(hydrated.blob, Some(blob));
2248 }
2249
2250 #[tokio::test(flavor = "multi_thread")]
2251 async fn supersede_resource_promotes_new_head_and_preserves_history() {
2252 let store = MemoryStore::new();
2253 let original_blob = vec![7_u8; 32];
2254 let original = ResourceObject::builder()
2255 .modality(ModalityProfile::Image)
2256 .mime_type("image/png")
2257 .display_name("frame-v1.png")
2258 .checksum(format!("blake3:{}", blake3::hash(&original_blob).to_hex()))
2259 .size_bytes(original_blob.len() as u64)
2260 .location(ResourceLocation::Blob { blob_index: 0 })
2261 .build()
2262 .unwrap();
2263 let original = persist_resource(&store, original, Some(original_blob.clone()))
2264 .await
2265 .unwrap();
2266
2267 let successor_blob = vec![8_u8; 48];
2268 let successor = supersede_resource(
2269 &store,
2270 original.id,
2271 ResourceSupersession {
2272 reason: Some("cropped and re-encoded".into()),
2273 display_name: Some("frame-v2.png".into()),
2274 checksum: Some(format!("blake3:{}", blake3::hash(&successor_blob).to_hex())),
2275 ..ResourceSupersession::default()
2276 },
2277 Some(successor_blob.clone()),
2278 )
2279 .await
2280 .unwrap();
2281
2282 let active_head = get_resource_head(&store, original.id)
2283 .await
2284 .unwrap()
2285 .unwrap();
2286 assert_eq!(active_head.id, successor.id);
2287 assert_eq!(
2288 active_head.logical_resource_id,
2289 original.logical_resource_id
2290 );
2291 assert_eq!(active_head.version, 2);
2292 assert_eq!(active_head.revision_operation, RevisionOperation::Supersede);
2293 assert_eq!(
2294 active_head.revision_reason.as_deref(),
2295 Some("cropped and re-encoded")
2296 );
2297 assert_eq!(active_head.revision_causation_id, Some(original.id));
2298 assert_eq!(active_head.display_name.as_deref(), Some("frame-v2.png"));
2299
2300 let historical = get_resource(&store, original.id).await.unwrap().unwrap();
2301 assert_eq!(historical.superseded_by, Some(successor.id));
2302 assert_eq!(
2303 historical.revision_state_against(&active_head),
2304 RevisionState::Superseded
2305 );
2306
2307 let revisions = list_resource_revisions(&store, original.id).await.unwrap();
2308 assert_eq!(revisions.len(), 2);
2309 assert_eq!(revisions[0].id, original.id);
2310 assert_eq!(revisions[1].id, successor.id);
2311
2312 let historical_fetch = fetch_resource(&store, original.id, HydrationMode::Full)
2313 .await
2314 .unwrap()
2315 .unwrap();
2316 assert_eq!(historical_fetch.blob, Some(original_blob));
2317
2318 let head_fetch = fetch_resource(&store, successor.id, HydrationMode::Full)
2319 .await
2320 .unwrap()
2321 .unwrap();
2322 assert_eq!(head_fetch.blob, Some(successor_blob));
2323 }
2324
2325 #[tokio::test(flavor = "multi_thread")]
2326 async fn reconcile_resource_head_mutations_repairs_missing_backlink() {
2327 let store = MemoryStore::new();
2328 let original_blob = vec![1_u8; 16];
2329 let original = ResourceObject::builder()
2330 .modality(ModalityProfile::Document)
2331 .display_name("draft-v1.txt")
2332 .location(ResourceLocation::Blob { blob_index: 0 })
2333 .build()
2334 .unwrap();
2335 let original = persist_resource(&store, original, Some(original_blob))
2336 .await
2337 .unwrap();
2338
2339 let successor_blob = vec![2_u8; 24];
2340 let now = Timestamp::now();
2341 let mut successor =
2342 build_successor_revision(&original, Some("published revision".to_string()), now);
2343 successor.display_name = Some("draft-v2.txt".into());
2344 successor.location = ResourceLocation::Blob { blob_index: 0 };
2345 successor.size_bytes = successor_blob.len() as u64;
2346 successor.checksum = Some(format!("blake3:{}", blake3::hash(&successor_blob).to_hex()));
2347
2348 let envelope = build_resource_head_transition_envelope(&original, &successor).unwrap();
2349 crate::mutation_envelope_ops::append_mutation_envelope(&store, &envelope)
2350 .await
2351 .unwrap();
2352 append_resource_revision(&store, &successor, Some(successor_blob))
2353 .await
2354 .unwrap();
2355
2356 let reconciled = reconcile_resource_head_mutations(&store).await.unwrap();
2357 assert_eq!(reconciled, 1);
2358
2359 let original_after = get_resource(&store, original.id).await.unwrap().unwrap();
2360 assert_eq!(original_after.superseded_by, Some(successor.id));
2361
2362 let envelope_after = get_mutation_envelope(&store, &envelope.id)
2363 .await
2364 .unwrap()
2365 .unwrap();
2366 assert_eq!(envelope_after.state, MutationEnvelopeState::Applied);
2367 }
2368
2369 #[tokio::test(flavor = "multi_thread")]
2370 async fn supersede_resource_can_preserve_existing_blob_without_new_payload() {
2371 let store = MemoryStore::new();
2372 let blob = vec![3_u8; 24];
2373 let original = ResourceObject::builder()
2374 .modality(ModalityProfile::Document)
2375 .display_name("brief-v1.pdf")
2376 .location(ResourceLocation::Blob { blob_index: 0 })
2377 .build()
2378 .unwrap();
2379 let original = persist_resource(&store, original, Some(blob.clone()))
2380 .await
2381 .unwrap();
2382
2383 let successor = supersede_resource(
2384 &store,
2385 original.id,
2386 ResourceSupersession {
2387 reason: Some("metadata refresh".into()),
2388 display_name: Some("brief-v2.pdf".into()),
2389 ..ResourceSupersession::default()
2390 },
2391 None,
2392 )
2393 .await
2394 .unwrap();
2395
2396 let hydrated = fetch_resource(&store, successor.id, HydrationMode::Full)
2397 .await
2398 .unwrap()
2399 .unwrap();
2400 assert_eq!(hydrated.blob, Some(blob));
2401 assert_eq!(
2402 hydrated.resource.display_name.as_deref(),
2403 Some("brief-v2.pdf")
2404 );
2405 assert_eq!(hydrated.resource.revision_causation_id, Some(original.id));
2406 }
2407
2408 #[tokio::test(flavor = "multi_thread")]
2409 async fn persist_resource_does_not_deduplicate_to_superseded_checksum_match() {
2410 let store = MemoryStore::new();
2411 let original_blob = vec![1_u8; 16];
2412 let original = ResourceObject::builder()
2413 .modality(ModalityProfile::Image)
2414 .checksum(format!("blake3:{}", blake3::hash(&original_blob).to_hex()))
2415 .location(ResourceLocation::Blob { blob_index: 0 })
2416 .build()
2417 .unwrap();
2418 let original = persist_resource(&store, original, Some(original_blob.clone()))
2419 .await
2420 .unwrap();
2421
2422 supersede_resource(
2423 &store,
2424 original.id,
2425 ResourceSupersession {
2426 checksum: Some("blake3:replacement".into()),
2427 ..ResourceSupersession::default()
2428 },
2429 Some(vec![2_u8; 20]),
2430 )
2431 .await
2432 .unwrap();
2433
2434 let replacement_candidate = ResourceObject::builder()
2435 .modality(ModalityProfile::Image)
2436 .checksum(format!("blake3:{}", blake3::hash(&original_blob).to_hex()))
2437 .location(ResourceLocation::Blob { blob_index: 0 })
2438 .build()
2439 .unwrap();
2440 let replacement = persist_resource(&store, replacement_candidate, Some(original_blob))
2441 .await
2442 .unwrap();
2443
2444 assert_ne!(replacement.id, original.id);
2445 assert_ne!(
2446 replacement.logical_resource_id,
2447 original.logical_resource_id
2448 );
2449 }
2450
2451 #[tokio::test(flavor = "multi_thread")]
2452 async fn load_resource_blob_requires_visible_resource_metadata() {
2453 struct TestPolicy;
2454
2455 #[async_trait::async_trait]
2456 impl NamespacePolicy for TestPolicy {
2457 async fn allowed_namespaces(&self, principal: &str) -> Option<Vec<String>> {
2458 match principal {
2459 "allowed" => Some(vec!["default".to_string()]),
2460 "blocked" => Some(vec!["blocked".to_string()]),
2461 _ => Some(Vec::new()),
2462 }
2463 }
2464 }
2465
2466 let store = PolicyEnforcedStore::new(MemoryStore::new(), Arc::new(TestPolicy));
2467 let blob = vec![4_u8; 64];
2468 let resource = ResourceObject::builder()
2469 .modality(ModalityProfile::Image)
2470 .location(ResourceLocation::Blob { blob_index: 0 })
2471 .build()
2472 .unwrap();
2473 let resource = CURRENT_PRINCIPAL
2474 .scope("allowed".to_string(), async {
2475 persist_resource(&store, resource, Some(blob.clone())).await
2476 })
2477 .await
2478 .unwrap();
2479
2480 let visible = CURRENT_PRINCIPAL
2481 .scope("allowed".to_string(), async {
2482 load_resource_blob(&store, resource.id, 0).await
2483 })
2484 .await
2485 .unwrap();
2486 assert_eq!(visible, blob);
2487
2488 let denied = CURRENT_PRINCIPAL
2489 .scope("blocked".to_string(), async {
2490 load_resource_blob(&store, resource.id, 0).await
2491 })
2492 .await
2493 .unwrap_err();
2494 assert!(matches!(denied, HirnDbError::BlobError { .. }));
2495 }
2496
2497 #[tokio::test(flavor = "multi_thread")]
2498 async fn redact_resource_blocks_payload_hydration_and_keeps_placeholder_head() {
2499 let store = MemoryStore::new();
2500 let blob = vec![6_u8; 96];
2501 let resource = ResourceObject::builder()
2502 .modality(ModalityProfile::Document)
2503 .mime_type("application/pdf")
2504 .display_name("roadmap.pdf")
2505 .location(ResourceLocation::Blob { blob_index: 0 })
2506 .build()
2507 .unwrap();
2508 let resource = persist_resource(&store, resource, Some(blob.clone()))
2509 .await
2510 .unwrap();
2511
2512 let artifact = DerivedArtifact::builder()
2513 .resource_id(resource.id)
2514 .kind(DerivedArtifactKind::Preview)
2515 .modality(ModalityProfile::Text)
2516 .text_content("preview text")
2517 .build()
2518 .unwrap();
2519 persist_derived_artifact(&store, artifact).await.unwrap();
2520
2521 let redacted = redact_resource(
2522 &store,
2523 resource.id,
2524 ResourceGovernanceUpdate {
2525 reason: Some("contains sensitive evidence".into()),
2526 placeholder_display_name: Some("redacted evidence".into()),
2527 },
2528 )
2529 .await
2530 .unwrap();
2531
2532 assert_eq!(redacted.governance_state, ResourceGovernanceState::Redacted);
2533 assert_eq!(redacted.display_name.as_deref(), Some("redacted evidence"));
2534
2535 let historical = get_resource(&store, resource.id).await.unwrap().unwrap();
2536 assert_eq!(
2537 historical.governance_state,
2538 ResourceGovernanceState::Redacted
2539 );
2540 assert_eq!(
2541 historical.display_name.as_deref(),
2542 Some("redacted evidence")
2543 );
2544 assert!(historical.mime_type.is_none());
2545 assert_eq!(historical.size_bytes, 0);
2546
2547 let preview = fetch_resource(&store, resource.id, HydrationMode::Preview)
2548 .await
2549 .unwrap()
2550 .unwrap();
2551 assert!(preview.artifacts.is_empty());
2552 assert!(preview.blob.is_none());
2553
2554 let full = fetch_resource(&store, redacted.id, HydrationMode::Full)
2555 .await
2556 .unwrap()
2557 .unwrap();
2558 assert!(full.artifacts.is_empty());
2559 assert!(full.blob.is_none());
2560
2561 let blob_err = load_resource_blob(&store, resource.id, 0)
2562 .await
2563 .unwrap_err();
2564 assert!(matches!(blob_err, HirnDbError::BlobError { .. }));
2565
2566 let remaining_blobs = store
2567 .scan(blob_ds::DATASET_NAME, ScanOptions::default())
2568 .await
2569 .unwrap();
2570 let remaining_artifacts = store
2571 .scan(artifact_ds::DATASET_NAME, ScanOptions::default())
2572 .await
2573 .unwrap();
2574 assert_eq!(
2575 remaining_blobs
2576 .iter()
2577 .map(|batch| batch.num_rows())
2578 .sum::<usize>(),
2579 0
2580 );
2581 assert_eq!(
2582 remaining_artifacts
2583 .iter()
2584 .map(|batch| batch.num_rows())
2585 .sum::<usize>(),
2586 0
2587 );
2588 }
2589
2590 #[tokio::test(flavor = "multi_thread")]
2591 async fn purge_resource_marks_lineage_as_purged() {
2592 let store = MemoryStore::new();
2593 let blob = vec![2_u8; 40];
2594 let resource = ResourceObject::builder()
2595 .modality(ModalityProfile::Image)
2596 .display_name("frame.png")
2597 .location(ResourceLocation::Blob { blob_index: 0 })
2598 .build()
2599 .unwrap();
2600 let resource = persist_resource(&store, resource, Some(blob))
2601 .await
2602 .unwrap();
2603
2604 let purged = purge_resource(&store, resource.id, ResourceGovernanceUpdate::default())
2605 .await
2606 .unwrap();
2607 assert_eq!(purged.governance_state, ResourceGovernanceState::Purged);
2608 assert_eq!(purged.display_name.as_deref(), Some("purged resource"));
2609
2610 let head = get_resource_head(&store, resource.id)
2611 .await
2612 .unwrap()
2613 .unwrap();
2614 assert_eq!(head.id, purged.id);
2615 assert_eq!(head.governance_state, ResourceGovernanceState::Purged);
2616
2617 let historical = get_resource(&store, resource.id).await.unwrap().unwrap();
2618 assert_eq!(historical.governance_state, ResourceGovernanceState::Purged);
2619 assert!(historical.mime_type.is_none());
2620 assert_eq!(historical.size_bytes, 0);
2621
2622 let revisions = list_resource_revisions(&store, resource.id).await.unwrap();
2623 assert_eq!(revisions.len(), 2);
2624 assert_eq!(revisions[0].id, resource.id);
2625 assert_eq!(revisions[1].id, purged.id);
2626 }
2627
2628 #[tokio::test(flavor = "multi_thread")]
2629 async fn retention_policy_targets_modality_and_classification() {
2630 let store = MemoryStore::new();
2631
2632 let image_restricted = persist_resource(
2633 &store,
2634 ResourceObject::builder()
2635 .modality(ModalityProfile::Image)
2636 .metadata_entry("classification", "restricted")
2637 .location(ResourceLocation::Blob { blob_index: 0 })
2638 .build()
2639 .unwrap(),
2640 Some(vec![1_u8; 24]),
2641 )
2642 .await
2643 .unwrap();
2644 let image_public = persist_resource(
2645 &store,
2646 ResourceObject::builder()
2647 .modality(ModalityProfile::Image)
2648 .metadata_entry("classification", "public")
2649 .location(ResourceLocation::Blob { blob_index: 0 })
2650 .build()
2651 .unwrap(),
2652 Some(vec![2_u8; 24]),
2653 )
2654 .await
2655 .unwrap();
2656 let document_restricted = persist_resource(
2657 &store,
2658 ResourceObject::builder()
2659 .modality(ModalityProfile::Document)
2660 .metadata_entry("classification", "restricted")
2661 .location(ResourceLocation::Blob { blob_index: 0 })
2662 .build()
2663 .unwrap(),
2664 Some(vec![3_u8; 24]),
2665 )
2666 .await
2667 .unwrap();
2668 let document_public = persist_resource(
2669 &store,
2670 ResourceObject::builder()
2671 .modality(ModalityProfile::Document)
2672 .metadata_entry("classification", "public")
2673 .location(ResourceLocation::Blob { blob_index: 0 })
2674 .build()
2675 .unwrap(),
2676 Some(vec![4_u8; 24]),
2677 )
2678 .await
2679 .unwrap();
2680
2681 let policy = ResourceRetentionPolicy::default()
2682 .with_rule(
2683 ResourceRetentionRule::new(ResourceRetentionAction::Redact)
2684 .classification("restricted"),
2685 )
2686 .with_rule(
2687 ResourceRetentionRule::new(ResourceRetentionAction::Purge)
2688 .modality(ModalityProfile::Image),
2689 );
2690
2691 let result = apply_resource_retention_policy(&store, &policy)
2692 .await
2693 .unwrap();
2694 assert_eq!(result.scanned_active_heads, 4);
2695 assert_eq!(result.governed_resources, 3);
2696 assert_eq!(result.redacted_resources, 1);
2697 assert_eq!(result.purged_resources, 2);
2698 assert_eq!(result.skipped_resources, 0);
2699
2700 let image_restricted = get_resource_head(&store, image_restricted.id)
2701 .await
2702 .unwrap()
2703 .unwrap();
2704 assert_eq!(
2705 image_restricted.governance_state,
2706 ResourceGovernanceState::Purged
2707 );
2708
2709 let image_public = get_resource_head(&store, image_public.id)
2710 .await
2711 .unwrap()
2712 .unwrap();
2713 assert_eq!(
2714 image_public.governance_state,
2715 ResourceGovernanceState::Purged
2716 );
2717
2718 let document_restricted = get_resource_head(&store, document_restricted.id)
2719 .await
2720 .unwrap()
2721 .unwrap();
2722 assert_eq!(
2723 document_restricted.governance_state,
2724 ResourceGovernanceState::Redacted
2725 );
2726
2727 let document_public = get_resource_head(&store, document_public.id)
2728 .await
2729 .unwrap()
2730 .unwrap();
2731 assert_eq!(
2732 document_public.governance_state,
2733 ResourceGovernanceState::Active
2734 );
2735 }
2736
2737 #[tokio::test(flavor = "multi_thread")]
2738 async fn persist_resource_with_quota_policy_blocks_namespace_limit() {
2739 let store = MemoryStore::new();
2740 let namespace = Namespace::new("quota-ns").unwrap();
2741 let policy = ResourceQuotaPolicy::default().with_rule(
2742 ResourceQuotaRule::new(ResourceQuotaScope::Namespace(namespace))
2743 .max_active_resources(1),
2744 );
2745
2746 let first = ResourceObject::builder()
2747 .modality(ModalityProfile::Document)
2748 .location(ResourceLocation::Blob { blob_index: 0 })
2749 .namespace(namespace)
2750 .build()
2751 .unwrap();
2752 persist_resource_with_quota_policy(&store, first, Some(vec![1_u8; 16]), &policy)
2753 .await
2754 .unwrap();
2755
2756 let second = ResourceObject::builder()
2757 .modality(ModalityProfile::Document)
2758 .location(ResourceLocation::Blob { blob_index: 0 })
2759 .namespace(namespace)
2760 .build()
2761 .unwrap();
2762 let error =
2763 persist_resource_with_quota_policy(&store, second, Some(vec![2_u8; 16]), &policy)
2764 .await
2765 .unwrap_err();
2766
2767 assert!(
2768 matches!(error, HirnDbError::LimitExceeded(message) if message.contains("namespace `quota-ns`") && message.contains("active resources"))
2769 );
2770 }
2771
2772 #[tokio::test(flavor = "multi_thread")]
2773 async fn supersede_resource_with_quota_policy_reuses_the_active_head_slot() {
2774 let store = MemoryStore::new();
2775 let namespace = Namespace::new("quota-replace").unwrap();
2776 let policy = ResourceQuotaPolicy::default().with_rule(
2777 ResourceQuotaRule::new(ResourceQuotaScope::Namespace(namespace))
2778 .max_active_resources(1),
2779 );
2780
2781 let original = ResourceObject::builder()
2782 .modality(ModalityProfile::Document)
2783 .location(ResourceLocation::Blob { blob_index: 0 })
2784 .namespace(namespace)
2785 .build()
2786 .unwrap();
2787 let original =
2788 persist_resource_with_quota_policy(&store, original, Some(vec![1_u8; 16]), &policy)
2789 .await
2790 .unwrap();
2791
2792 let successor = supersede_resource_with_quota_policy(
2793 &store,
2794 original.id,
2795 ResourceSupersession {
2796 display_name: Some("replacement.pdf".into()),
2797 ..Default::default()
2798 },
2799 Some(vec![2_u8; 24]),
2800 &policy,
2801 )
2802 .await
2803 .unwrap();
2804
2805 let head = get_resource_head(&store, original.id)
2806 .await
2807 .unwrap()
2808 .unwrap();
2809 assert_eq!(head.id, successor.id);
2810 assert_eq!(head.display_name.as_deref(), Some("replacement.pdf"));
2811 let revisions = list_resource_revisions(&store, original.id).await.unwrap();
2812 assert_eq!(revisions.len(), 2);
2813 }
2814}