Skip to main content

hirn_storage/
resource_ops.rs

1use std::collections::BTreeMap;
2use std::io::Cursor;
3
4use arrow_array::{Array, BinaryArray};
5use image::ImageFormat;
6
7use hirn_core::metadata::Metadata;
8use hirn_core::types::{AgentId, Namespace};
9use hirn_core::{
10    DerivedArtifact, DerivedArtifactKind, EvidenceRole, HydrationMode, LogicalResourceId,
11    ModalityProfile, ResourceGovernanceState, ResourceId, ResourceLocation, ResourceObject,
12    ResourceQuotaPolicy, ResourceQuotaScope, ResourceRetentionAction, ResourceRetentionPolicy,
13    ResourceRevisionId, RevisionOperation, Timestamp,
14};
15
16use crate::HirnDbError;
17use crate::datasets::{derived_artifact as artifact_ds, resource_blob as blob_ds, resource_object};
18use crate::mutation_envelope_ops::{
19    MutationEnvelopeRecord, MutationEnvelopeState, list_pending_mutation_envelopes,
20    update_mutation_envelope_state,
21};
22use crate::store::{PhysicalStore, ScanOptions};
23
24pub const RESOURCE_HEAD_TRANSITION_KIND: &str = "resource_head_transition";
25
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27struct ResourceHeadTransitionEnvelope {
28    current_id: ResourceId,
29    successor_id: ResourceId,
30    successor_created_at_ms: i64,
31}
32
33/// Patch-style update describing a superseding resource revision.
34#[derive(Debug, Clone, Default, PartialEq)]
35pub struct ResourceSupersession {
36    pub reason: Option<String>,
37    pub modality: Option<ModalityProfile>,
38    pub mime_type: Option<String>,
39    pub display_name: Option<String>,
40    pub checksum: Option<String>,
41    pub size_bytes: Option<u64>,
42    pub location: Option<ResourceLocation>,
43    pub metadata: Option<Metadata>,
44}
45
46/// Lineage-preserving governance update for a resource head.
47#[derive(Debug, Clone, Default, PartialEq)]
48pub struct ResourceGovernanceUpdate {
49    pub reason: Option<String>,
50    pub placeholder_display_name: Option<String>,
51}
52
53/// Hydrated resource payload returned from storage fetch operations.
54#[derive(Debug, Clone, PartialEq)]
55pub struct HydratedResource {
56    pub resource: ResourceObject,
57    pub artifacts: Vec<DerivedArtifact>,
58    pub blob: Option<Vec<u8>>,
59}
60
61/// Outcome summary for an operator-triggered retention pass.
62#[derive(Debug, Clone, Default, PartialEq, Eq)]
63pub struct ResourceRetentionApplyResult {
64    pub scanned_active_heads: usize,
65    pub governed_resources: usize,
66    pub redacted_resources: usize,
67    pub purged_resources: usize,
68    pub skipped_resources: usize,
69}
70
71/// Persist a resource object plus its blob payload when applicable.
72///
73/// When a checksum is present, storage deduplicates within the same namespace
74/// and returns the existing resource head instead of creating a duplicate.
75pub async fn persist_resource(
76    store: &dyn PhysicalStore,
77    resource: ResourceObject,
78    blob: Option<Vec<u8>>,
79) -> Result<ResourceObject, HirnDbError> {
80    persist_resource_inner(store, resource, blob, None).await
81}
82
83/// Persist a resource while enforcing a configured quota policy.
84pub async fn persist_resource_with_quota_policy(
85    store: &dyn PhysicalStore,
86    resource: ResourceObject,
87    blob: Option<Vec<u8>>,
88    quota_policy: &ResourceQuotaPolicy,
89) -> Result<ResourceObject, HirnDbError> {
90    persist_resource_inner(store, resource, blob, Some(quota_policy)).await
91}
92
93/// Build a canonical blob-backed resource object for live ingest paths.
94pub fn build_configured_blob_resource<F>(
95    namespace: Namespace,
96    owner_agent_id: AgentId,
97    modality: ModalityProfile,
98    mime_type: Option<&str>,
99    data: &[u8],
100    configure: F,
101) -> Result<ResourceObject, HirnDbError>
102where
103    F: FnOnce(
104        hirn_core::resource::ResourceObjectBuilder,
105    ) -> hirn_core::resource::ResourceObjectBuilder,
106{
107    let checksum = format!("blake3:{}", blake3::hash(data).to_hex());
108    let mut builder = ResourceObject::builder()
109        .modality(modality)
110        .checksum(checksum)
111        .size_bytes(data.len() as u64)
112        .location(ResourceLocation::Blob { blob_index: 0 })
113        .owner_agent_id(owner_agent_id)
114        .namespace(namespace);
115    if let Some(mime_type) = mime_type {
116        builder = builder.mime_type(mime_type);
117    }
118
119    configure(builder)
120        .build()
121        .map_err(|error| HirnDbError::InvalidArgument(error.to_string()))
122}
123
124/// Add standardized audio transport metadata to a resource builder.
125pub fn configure_audio_resource_builder(
126    builder: hirn_core::resource::ResourceObjectBuilder,
127    duration_ms: u64,
128    channel_count: Option<u16>,
129) -> hirn_core::resource::ResourceObjectBuilder {
130    let mut builder = builder.metadata_entry(
131        "duration_ms",
132        i64::try_from(duration_ms).unwrap_or(i64::MAX),
133    );
134    if let Some(channel_count) = channel_count {
135        builder = builder.metadata_entry("channel_count", i64::from(channel_count));
136    }
137    builder
138}
139
140async fn persist_resource_inner(
141    store: &dyn PhysicalStore,
142    resource: ResourceObject,
143    blob: Option<Vec<u8>>,
144    quota_policy: Option<&ResourceQuotaPolicy>,
145) -> Result<ResourceObject, HirnDbError> {
146    let mut resource = resource;
147    let blob = prepare_blob_payload(store, None, &mut resource, blob).await?;
148
149    if let Some(checksum) = resource.checksum.as_deref()
150        && let Some(existing) =
151            find_live_resource_by_checksum(store, resource.namespace, checksum).await?
152    {
153        return Ok(existing);
154    }
155
156    if let Some(quota_policy) = quota_policy {
157        enforce_resource_quota_policy(store, &resource, None, quota_policy).await?;
158    }
159
160    append_resource_revision(store, &resource, blob).await?;
161
162    Ok(resource)
163}
164
165/// List all revisions in the lineage containing the provided resource revision.
166pub async fn list_resource_revisions(
167    store: &dyn PhysicalStore,
168    resource_id: ResourceId,
169) -> Result<Vec<ResourceObject>, HirnDbError> {
170    let Some(resource) = get_resource_raw(store, resource_id).await? else {
171        return Ok(Vec::new());
172    };
173
174    list_resource_revisions_for_logical_id(store, resource.logical_resource_id).await
175}
176
177/// Resolve the active resource head for the lineage containing the provided revision.
178pub async fn get_resource_head(
179    store: &dyn PhysicalStore,
180    resource_id: ResourceId,
181) -> Result<Option<ResourceObject>, HirnDbError> {
182    let revisions = list_resource_revisions(store, resource_id).await?;
183    Ok(select_active_resource_head(&revisions))
184}
185
186/// Append a superseding resource revision while preserving historical lookups.
187pub async fn supersede_resource(
188    store: &dyn PhysicalStore,
189    resource_id: ResourceId,
190    supersession: ResourceSupersession,
191    blob: Option<Vec<u8>>,
192) -> Result<ResourceObject, HirnDbError> {
193    supersede_resource_inner(store, resource_id, supersession, blob, None).await
194}
195
196/// Append a superseding resource revision while enforcing a configured quota policy.
197pub async fn supersede_resource_with_quota_policy(
198    store: &dyn PhysicalStore,
199    resource_id: ResourceId,
200    supersession: ResourceSupersession,
201    blob: Option<Vec<u8>>,
202    quota_policy: &ResourceQuotaPolicy,
203) -> Result<ResourceObject, HirnDbError> {
204    supersede_resource_inner(store, resource_id, supersession, blob, Some(quota_policy)).await
205}
206
207async fn supersede_resource_inner(
208    store: &dyn PhysicalStore,
209    resource_id: ResourceId,
210    supersession: ResourceSupersession,
211    blob: Option<Vec<u8>>,
212    quota_policy: Option<&ResourceQuotaPolicy>,
213) -> Result<ResourceObject, HirnDbError> {
214    let Some(current) = get_resource_head(store, resource_id).await? else {
215        return Err(HirnDbError::InvalidArgument(format!(
216            "resource not found: {resource_id}"
217        )));
218    };
219
220    let now = Timestamp::now();
221    let mut successor = build_successor_revision(
222        &current,
223        normalize_optional_string(supersession.reason),
224        now,
225    );
226
227    if let Some(modality) = supersession.modality {
228        successor.modality = modality;
229    }
230    if let Some(mime_type) = supersession.mime_type {
231        successor.mime_type = Some(mime_type);
232    }
233    if let Some(display_name) = supersession.display_name {
234        successor.display_name = Some(display_name);
235    }
236    if let Some(checksum) = supersession.checksum {
237        successor.checksum = Some(checksum);
238    }
239    if let Some(size_bytes) = supersession.size_bytes {
240        successor.size_bytes = size_bytes;
241    }
242    if let Some(location) = supersession.location {
243        successor.location = location;
244    }
245    if let Some(metadata) = supersession.metadata {
246        successor.metadata = metadata;
247    }
248
249    let blob = prepare_blob_payload(store, Some(&current), &mut successor, blob).await?;
250    if let Some(quota_policy) = quota_policy {
251        enforce_resource_quota_policy(store, &successor, Some(&current), quota_policy).await?;
252    }
253    let envelope = build_resource_head_transition_envelope(&current, &successor)?;
254    crate::mutation_envelope_ops::append_mutation_envelope(store, &envelope).await?;
255    if let Err(error) = append_resource_revision(store, &successor, blob).await {
256        let _ = mark_resource_head_transition_failed(store, &envelope.id, &error).await;
257        return Err(error);
258    }
259
260    let mut updated_current = current.clone();
261    updated_current.superseded_by = Some(successor.id);
262    updated_current.updated_at = now;
263    if let Err(error) = upsert_resource_revision(store, &updated_current).await {
264        rollback_resource_revision(store, &successor).await;
265        let _ = mark_resource_head_transition_failed(store, &envelope.id, &error).await;
266        return Err(error);
267    }
268
269    update_mutation_envelope_state(store, &envelope.id, MutationEnvelopeState::Applied, None)
270        .await?;
271
272    Ok(successor)
273}
274
275pub async fn reconcile_resource_head_mutations(
276    store: &dyn PhysicalStore,
277) -> Result<usize, HirnDbError> {
278    let envelopes =
279        list_pending_mutation_envelopes(store, Some(RESOURCE_HEAD_TRANSITION_KIND)).await?;
280    let mut reconciled = 0usize;
281
282    for envelope in envelopes {
283        match reconcile_single_resource_head_transition(store, &envelope).await {
284            Ok(true) => reconciled += 1,
285            Ok(false) => {}
286            Err(error) => {
287                let _ = mark_resource_head_transition_failed(store, &envelope.id, &error).await;
288            }
289        }
290    }
291
292    Ok(reconciled)
293}
294
295/// Reconcile resources that were left in `storage_ready = false` staging state by a
296/// previous crash or interrupted blob write.
297///
298/// For each staging record the reconciler checks whether the accompanying blob row
299/// exists in `_resource_blobs`.  If the blob is present the record is finalized by
300/// flipping `storage_ready = true` via `merge_insert`.  If the blob is absent the
301/// partial record is deleted (preventing a dangling metadata row that would never
302/// become visible).
303///
304/// Called once during `HirnDB::open_with_config()` after dataset initialization.
305pub async fn reconcile_pending_resource_blob_staging(
306    store: &dyn PhysicalStore,
307) -> Result<usize, HirnDbError> {
308    // Scan for all resource rows — filter must not be pushed to the physical store
309    // because `get_resource_raw` already filters on `storage_ready = true`.  We
310    // need the raw rows regardless of that flag.
311    let filter = "storage_ready = false".to_string();
312    let batches = store
313        .scan(
314            resource_object::DATASET_NAME,
315            ScanOptions {
316                filter: Some(filter),
317                exact_filter: None,
318                columns: None,
319                order_by: None,
320                limit: None,
321                offset: None,
322            },
323        )
324        .await?;
325
326    let mut staging_records: Vec<ResourceObject> = Vec::new();
327    for batch in &batches {
328        staging_records.extend(
329            resource_object::from_batch(batch)?
330                .into_iter()
331                .filter(|r| !r.storage_ready),
332        );
333    }
334
335    if staging_records.is_empty() {
336        return Ok(0);
337    }
338
339    let mut reconciled = 0_usize;
340    for mut resource in staging_records {
341        let blob_index = match resource.location {
342            ResourceLocation::Blob { blob_index } => blob_index,
343            _ => {
344                // Metadata-only staging record with no blob — finalize it directly.
345                resource.storage_ready = true;
346                let _ = upsert_resource_revision(store, &resource).await;
347                reconciled += 1;
348                continue;
349            }
350        };
351
352        // Check whether the blob was already written.
353        match load_resource_blob_unchecked(store, resource.id, blob_index).await {
354            Ok(_) => {
355                // Blob is present — finalize the metadata record.
356                resource.storage_ready = true;
357                match upsert_resource_revision(store, &resource).await {
358                    Ok(()) => {
359                        tracing::debug!(
360                            resource_id = %resource.id,
361                            "reconciled staging resource: blob present, finalized"
362                        );
363                        reconciled += 1;
364                    }
365                    Err(error) => {
366                        tracing::warn!(
367                            resource_id = %resource.id,
368                            %error,
369                            "reconcile: failed to finalize staged resource"
370                        );
371                    }
372                }
373            }
374            Err(_) => {
375                // Blob is absent — the write was interrupted before the blob was
376                // persisted.  Delete the dangling staging row.
377                let filter = format!("id = '{}'", resource.id);
378                if let Err(error) = store.delete(resource_object::DATASET_NAME, &filter).await {
379                    tracing::warn!(
380                        resource_id = %resource.id,
381                        %error,
382                        "reconcile: failed to delete dangling staging resource row"
383                    );
384                } else {
385                    tracing::debug!(
386                        resource_id = %resource.id,
387                        "reconcile: deleted dangling staging resource (no blob found)"
388                    );
389                    reconciled += 1;
390                }
391            }
392        }
393    }
394
395    Ok(reconciled)
396}
397
398fn build_resource_head_transition_envelope(
399    current: &ResourceObject,
400    successor: &ResourceObject,
401) -> Result<MutationEnvelopeRecord, HirnDbError> {
402    let payload = ResourceHeadTransitionEnvelope {
403        current_id: current.id,
404        successor_id: successor.id,
405        successor_created_at_ms: successor.created_at.timestamp_ms(),
406    };
407
408    let payload = serde_json::to_vec(&payload).map_err(|error| {
409        HirnDbError::InvalidArgument(format!("resource head envelope serialize: {error}"))
410    })?;
411
412    Ok(MutationEnvelopeRecord::pending(
413        format!("resource-head:{}", successor.id),
414        RESOURCE_HEAD_TRANSITION_KIND,
415        payload,
416    ))
417}
418
419async fn reconcile_single_resource_head_transition(
420    store: &dyn PhysicalStore,
421    envelope: &MutationEnvelopeRecord,
422) -> Result<bool, HirnDbError> {
423    let payload: ResourceHeadTransitionEnvelope = serde_json::from_slice(&envelope.payload)
424        .map_err(|error| {
425            HirnDbError::InvalidArgument(format!("resource head envelope deserialize: {error}"))
426        })?;
427
428    let current = get_resource_raw(store, payload.current_id).await?;
429    let successor = get_resource_raw(store, payload.successor_id).await?;
430
431    match (current, successor) {
432        (Some(current), Some(successor)) if current.superseded_by == Some(successor.id) => {
433            update_mutation_envelope_state(
434                store,
435                &envelope.id,
436                MutationEnvelopeState::Applied,
437                None,
438            )
439            .await?;
440            Ok(false)
441        }
442        (Some(mut current), Some(successor)) => {
443            current.superseded_by = Some(successor.id);
444            current.updated_at = Timestamp::from_millis(
445                u64::try_from(payload.successor_created_at_ms).map_err(|_| {
446                    HirnDbError::InvalidArgument(
447                        "resource head envelope successor_created_at_ms was negative".into(),
448                    )
449                })?,
450            );
451            upsert_resource_revision(store, &current).await?;
452            update_mutation_envelope_state(
453                store,
454                &envelope.id,
455                MutationEnvelopeState::Applied,
456                None,
457            )
458            .await?;
459            Ok(true)
460        }
461        (Some(mut current), None) => {
462            if current.superseded_by == Some(payload.successor_id) {
463                current.superseded_by = None;
464                upsert_resource_revision(store, &current).await?;
465            }
466            mark_resource_head_transition_failed(
467                store,
468                &envelope.id,
469                &HirnDbError::InvalidArgument(format!(
470                    "resource head recovery missing successor revision: {}",
471                    payload.successor_id
472                )),
473            )
474            .await?;
475            Ok(true)
476        }
477        (None, Some(successor)) => {
478            rollback_resource_revision(store, &successor).await;
479            mark_resource_head_transition_failed(
480                store,
481                &envelope.id,
482                &HirnDbError::InvalidArgument(format!(
483                    "resource head recovery missing current revision: {}",
484                    payload.current_id
485                )),
486            )
487            .await?;
488            Ok(true)
489        }
490        (None, None) => {
491            mark_resource_head_transition_failed(
492                store,
493                &envelope.id,
494                &HirnDbError::InvalidArgument(format!(
495                    "resource head recovery missing both revisions: {} -> {}",
496                    payload.current_id, payload.successor_id
497                )),
498            )
499            .await?;
500            Ok(false)
501        }
502    }
503}
504
505async fn mark_resource_head_transition_failed(
506    store: &dyn PhysicalStore,
507    envelope_id: &str,
508    error: &HirnDbError,
509) -> Result<(), HirnDbError> {
510    update_mutation_envelope_state(
511        store,
512        envelope_id,
513        MutationEnvelopeState::Failed,
514        Some(error.to_string()),
515    )
516    .await
517}
518
519/// Create a lineage-preserving redacted placeholder and block future payload hydration.
520pub async fn redact_resource(
521    store: &dyn PhysicalStore,
522    resource_id: ResourceId,
523    update: ResourceGovernanceUpdate,
524) -> Result<ResourceObject, HirnDbError> {
525    govern_resource(
526        store,
527        resource_id,
528        ResourceGovernanceState::Redacted,
529        update,
530    )
531    .await
532}
533
534/// Create a lineage-preserving purged placeholder and block future payload hydration.
535pub async fn purge_resource(
536    store: &dyn PhysicalStore,
537    resource_id: ResourceId,
538    update: ResourceGovernanceUpdate,
539) -> Result<ResourceObject, HirnDbError> {
540    govern_resource(store, resource_id, ResourceGovernanceState::Purged, update).await
541}
542
543/// Apply an operator-configured retention policy to active resource heads.
544pub async fn apply_resource_retention_policy(
545    store: &dyn PhysicalStore,
546    policy: &ResourceRetentionPolicy,
547) -> Result<ResourceRetentionApplyResult, HirnDbError> {
548    if policy.is_empty() {
549        return Ok(ResourceRetentionApplyResult::default());
550    }
551
552    let mut result = ResourceRetentionApplyResult::default();
553    for resource in list_active_resource_heads(store).await? {
554        result.scanned_active_heads += 1;
555
556        let Some(action) = policy.strongest_action_for(&resource) else {
557            continue;
558        };
559
560        if resource.governance_state == governance_state_for_action(action)
561            || resource.governance_state == ResourceGovernanceState::Purged
562        {
563            result.skipped_resources += 1;
564            continue;
565        }
566
567        let update = ResourceGovernanceUpdate {
568            reason: Some(format!("retention policy {}", action.as_str())),
569            placeholder_display_name: None,
570        };
571        match action {
572            ResourceRetentionAction::Redact => {
573                redact_resource(store, resource.id, update).await?;
574                result.redacted_resources += 1;
575            }
576            ResourceRetentionAction::Purge => {
577                purge_resource(store, resource.id, update).await?;
578                result.purged_resources += 1;
579            }
580        }
581        result.governed_resources += 1;
582    }
583
584    Ok(result)
585}
586
587/// Persist a derived artifact for an existing resource.
588pub async fn persist_derived_artifact(
589    store: &dyn PhysicalStore,
590    artifact: DerivedArtifact,
591) -> Result<(), HirnDbError> {
592    let batch = artifact_ds::to_batch(std::slice::from_ref(&artifact))?;
593    store.append(artifact_ds::DATASET_NAME, batch).await
594}
595
596/// Inputs available to the shared derived-artifact planner.
597#[derive(Debug, Clone, Copy, Default)]
598pub struct DerivedArtifactInput<'a> {
599    pub text_content: &'a str,
600    pub blob_bytes: Option<&'a [u8]>,
601    pub mime_type: Option<&'a str>,
602}
603
604impl<'a> DerivedArtifactInput<'a> {
605    #[must_use]
606    pub const fn new(text_content: &'a str) -> Self {
607        Self {
608            text_content,
609            blob_bytes: None,
610            mime_type: None,
611        }
612    }
613
614    #[must_use]
615    pub const fn with_blob(mut self, blob_bytes: &'a [u8], mime_type: Option<&'a str>) -> Self {
616        self.blob_bytes = Some(blob_bytes);
617        self.mime_type = mime_type;
618        self
619    }
620}
621
622#[derive(Debug, Clone, Copy, PartialEq, Eq)]
623struct DefaultTextArtifactPlan {
624    kind: DerivedArtifactKind,
625    record_failure: bool,
626}
627
628impl DefaultTextArtifactPlan {
629    const fn new(kind: DerivedArtifactKind, record_failure: bool) -> Self {
630        Self {
631            kind,
632            record_failure,
633        }
634    }
635}
636
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
638struct DefaultBlobArtifactPlan {
639    kind: DerivedArtifactKind,
640    record_failure: bool,
641}
642
643impl DefaultBlobArtifactPlan {
644    const fn new(kind: DerivedArtifactKind, record_failure: bool) -> Self {
645        Self {
646            kind,
647            record_failure,
648        }
649    }
650}
651
652const IMAGE_SOURCE_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 2] = [
653    DefaultTextArtifactPlan::new(DerivedArtifactKind::Caption, true),
654    DefaultTextArtifactPlan::new(DerivedArtifactKind::OcrText, false),
655];
656const IMAGE_SOURCE_BLOB_ARTIFACTS: [DefaultBlobArtifactPlan; 1] = [DefaultBlobArtifactPlan::new(
657    DerivedArtifactKind::Thumbnail,
658    true,
659)];
660const AUDIO_SOURCE_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 1] = [DefaultTextArtifactPlan::new(
661    DerivedArtifactKind::Transcript,
662    true,
663)];
664const CODE_SOURCE_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 1] = [DefaultTextArtifactPlan::new(
665    DerivedArtifactKind::SyntaxSummary,
666    true,
667)];
668const STRUCTURED_SOURCE_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 1] =
669    [DefaultTextArtifactPlan::new(
670        DerivedArtifactKind::SchemaSummary,
671        true,
672    )];
673const PREVIEW_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 1] = [DefaultTextArtifactPlan::new(
674    DerivedArtifactKind::Preview,
675    true,
676)];
677const NO_TEXT_ARTIFACTS: [DefaultTextArtifactPlan; 0] = [];
678const NO_BLOB_ARTIFACTS: [DefaultBlobArtifactPlan; 0] = [];
679const THUMBNAIL_MAX_DIMENSION_PX: u32 = 256;
680
681/// Persist deterministic derived artifacts from already-available ingest inputs.
682pub async fn persist_default_derived_artifacts(
683    store: &dyn PhysicalStore,
684    resource: &ResourceObject,
685    role: EvidenceRole,
686    input: DerivedArtifactInput<'_>,
687) -> Result<Vec<DerivedArtifact>, HirnDbError> {
688    let text_plans = default_text_artifact_plan(resource.modality, role);
689    let blob_plans = default_blob_artifact_plan(resource.modality, role);
690    if text_plans.is_empty() && blob_plans.is_empty() {
691        return Ok(Vec::new());
692    }
693
694    let mut known_artifacts = list_derived_artifacts(store, resource.id).await?;
695    let text_content = input.text_content.trim();
696    let mut created = Vec::new();
697
698    for plan in text_plans {
699        if known_artifacts
700            .iter()
701            .any(|artifact| artifact.kind == plan.kind)
702            || known_artifacts
703                .iter()
704                .any(|artifact| artifact_failure_matches(artifact, plan.kind))
705        {
706            continue;
707        }
708
709        let artifact = if text_content.is_empty() {
710            if !plan.record_failure {
711                continue;
712            }
713
714            build_generation_failure_artifact(resource, role, plan.kind, "source text was empty")?
715        } else {
716            let mut builder = DerivedArtifact::builder()
717                .resource_id(resource.id)
718                .kind(plan.kind)
719                .modality(ModalityProfile::Text)
720                .text_content(text_content)
721                .namespace(resource.namespace);
722
723            if resource.modality == ModalityProfile::Image
724                && role == EvidenceRole::Source
725                && plan.kind == DerivedArtifactKind::OcrText
726            {
727                builder = builder
728                    .metadata_entry("generation_strategy", "text_surrogate_fallback")
729                    .metadata_entry("fallback_source", "image_description");
730            }
731
732            builder
733                .build()
734                .map_err(|error| HirnDbError::InvalidArgument(error.to_string()))?
735        };
736
737        persist_derived_artifact(store, artifact.clone()).await?;
738        known_artifacts.push(artifact.clone());
739        created.push(artifact);
740    }
741
742    for plan in blob_plans {
743        if known_artifacts
744            .iter()
745            .any(|artifact| artifact.kind == plan.kind)
746            || known_artifacts
747                .iter()
748                .any(|artifact| artifact_failure_matches(artifact, plan.kind))
749        {
750            continue;
751        }
752
753        let artifact = match input.blob_bytes {
754            Some(blob_bytes) if !blob_bytes.is_empty() => {
755                match build_binary_derived_artifact(
756                    resource,
757                    plan.kind,
758                    blob_bytes,
759                    input.mime_type,
760                    &known_artifacts,
761                ) {
762                    Ok((artifact, blob_bytes)) => {
763                        persist_derived_artifact_with_blob(store, artifact.clone(), blob_bytes)
764                            .await?;
765                        artifact
766                    }
767                    Err(error) if plan.record_failure => {
768                        let failure =
769                            build_generation_failure_artifact(resource, role, plan.kind, &error)?;
770                        persist_derived_artifact(store, failure.clone()).await?;
771                        failure
772                    }
773                    Err(_) => continue,
774                }
775            }
776            _ if plan.record_failure => {
777                let failure = build_generation_failure_artifact(
778                    resource,
779                    role,
780                    plan.kind,
781                    "source blob was unavailable",
782                )?;
783                persist_derived_artifact(store, failure.clone()).await?;
784                failure
785            }
786            _ => continue,
787        };
788
789        known_artifacts.push(artifact.clone());
790        created.push(artifact);
791    }
792
793    Ok(created)
794}
795
796/// Map a derived artifact kind onto the evidence role callers should expose.
797#[must_use]
798pub const fn derived_artifact_evidence_role(kind: DerivedArtifactKind) -> EvidenceRole {
799    match kind {
800        DerivedArtifactKind::Preview | DerivedArtifactKind::Thumbnail => EvidenceRole::Preview,
801        DerivedArtifactKind::OcrText
802        | DerivedArtifactKind::Transcript
803        | DerivedArtifactKind::Caption
804        | DerivedArtifactKind::SyntaxSummary
805        | DerivedArtifactKind::SchemaSummary
806        | DerivedArtifactKind::GenerationFailure => EvidenceRole::Derived,
807    }
808}
809
810/// Build explicit evidence links for derived artifacts created during ingest.
811#[must_use]
812pub fn evidence_links_for_derived_artifacts(
813    artifacts: &[DerivedArtifact],
814    part_index: Option<u32>,
815) -> Vec<hirn_core::EvidenceLink> {
816    artifacts
817        .iter()
818        .filter(|artifact| artifact.kind != DerivedArtifactKind::GenerationFailure)
819        .map(|artifact| {
820            let mut link = hirn_core::EvidenceLink::new(
821                artifact.resource_id,
822                derived_artifact_evidence_role(artifact.kind),
823            )
824            .with_artifact(artifact.id)
825            .with_provenance(artifact.kind.evidence_provenance())
826            .with_description(artifact.kind.as_str());
827            if let Some(part_index) = part_index {
828                link = link.with_part_index(part_index);
829            }
830            link
831        })
832        .collect()
833}
834
835/// Stable checksum for text-backed resources whose semantics depend on more than raw bytes.
836#[must_use]
837pub fn text_backed_resource_checksum(discriminator: &str, payload: &[u8]) -> String {
838    let mut hasher = blake3::Hasher::new();
839    hasher.update(discriminator.as_bytes());
840    hasher.update(&[0]);
841    hasher.update(payload);
842    format!("blake3:{}", hasher.finalize().to_hex())
843}
844
845fn artifact_failure_matches(
846    artifact: &DerivedArtifact,
847    intended_kind: DerivedArtifactKind,
848) -> bool {
849    artifact.kind == DerivedArtifactKind::GenerationFailure
850        && matches!(
851            artifact.metadata.get("intended_kind"),
852            Some(hirn_core::metadata::MetadataValue::String(value)) if value == intended_kind.as_str()
853        )
854}
855
856fn build_generation_failure_artifact(
857    resource: &ResourceObject,
858    role: EvidenceRole,
859    intended_kind: DerivedArtifactKind,
860    reason: &str,
861) -> Result<DerivedArtifact, HirnDbError> {
862    DerivedArtifact::builder()
863        .resource_id(resource.id)
864        .kind(DerivedArtifactKind::GenerationFailure)
865        .modality(ModalityProfile::Text)
866        .text_content(format!(
867            "{} generation failed: {reason}",
868            intended_kind.as_str()
869        ))
870        .metadata_entry("intended_kind", intended_kind.as_str().to_string())
871        .metadata_entry("failure_reason", reason.to_string())
872        .metadata_entry("source_role", role.as_str().to_string())
873        .namespace(resource.namespace)
874        .build()
875        .map_err(|error| HirnDbError::InvalidArgument(error.to_string()))
876}
877
878fn build_binary_derived_artifact(
879    resource: &ResourceObject,
880    kind: DerivedArtifactKind,
881    blob_bytes: &[u8],
882    mime_type: Option<&str>,
883    existing_artifacts: &[DerivedArtifact],
884) -> Result<(DerivedArtifact, Vec<u8>), String> {
885    match kind {
886        DerivedArtifactKind::Thumbnail => {
887            let (thumbnail_bytes, width, height) = generate_thumbnail_bytes(blob_bytes, mime_type)?;
888            let blob_index = next_derived_artifact_blob_index(existing_artifacts);
889            let mut builder = DerivedArtifact::builder()
890                .resource_id(resource.id)
891                .kind(DerivedArtifactKind::Thumbnail)
892                .modality(ModalityProfile::Image)
893                .mime_type("image/png")
894                .blob_index(blob_index)
895                .checksum(format!(
896                    "blake3:{}",
897                    blake3::hash(&thumbnail_bytes).to_hex()
898                ))
899                .metadata_entry("generation_strategy", "downscaled_source_image")
900                .metadata_entry("max_dimension_px", i64::from(THUMBNAIL_MAX_DIMENSION_PX))
901                .metadata_entry("width_px", i64::from(width))
902                .metadata_entry("height_px", i64::from(height))
903                .namespace(resource.namespace);
904            if let Some(mime_type) = mime_type {
905                builder = builder.metadata_entry("source_mime_type", mime_type.to_string());
906            }
907            let artifact = builder.build().map_err(|error| error.to_string())?;
908            Ok((artifact, thumbnail_bytes))
909        }
910        other => Err(format!(
911            "unsupported binary derived artifact kind: {}",
912            other.as_str()
913        )),
914    }
915}
916
917fn generate_thumbnail_bytes(
918    blob_bytes: &[u8],
919    mime_type: Option<&str>,
920) -> Result<(Vec<u8>, u32, u32), String> {
921    let image = if let Some(format) = mime_type.and_then(image_format_from_mime_type) {
922        image::load_from_memory_with_format(blob_bytes, format)
923    } else {
924        image::load_from_memory(blob_bytes)
925    }
926    .map_err(|error| format!("failed to decode image for thumbnail generation: {error}"))?;
927
928    let thumbnail = image.thumbnail(THUMBNAIL_MAX_DIMENSION_PX, THUMBNAIL_MAX_DIMENSION_PX);
929    let width = thumbnail.width();
930    let height = thumbnail.height();
931    let mut encoded = Cursor::new(Vec::new());
932    thumbnail
933        .write_to(&mut encoded, ImageFormat::Png)
934        .map_err(|error| format!("failed to encode thumbnail image: {error}"))?;
935    Ok((encoded.into_inner(), width, height))
936}
937
938fn image_format_from_mime_type(mime_type: &str) -> Option<ImageFormat> {
939    match mime_type {
940        "image/png" => Some(ImageFormat::Png),
941        "image/jpeg" | "image/jpg" => Some(ImageFormat::Jpeg),
942        "image/gif" => Some(ImageFormat::Gif),
943        "image/webp" => Some(ImageFormat::WebP),
944        "image/bmp" => Some(ImageFormat::Bmp),
945        "image/tiff" => Some(ImageFormat::Tiff),
946        _ => None,
947    }
948}
949
950const fn next_derived_artifact_blob_index(existing_artifacts: &[DerivedArtifact]) -> u32 {
951    let mut next = 1;
952    let mut idx = 0;
953    while idx < existing_artifacts.len() {
954        if let Some(blob_index) = existing_artifacts[idx].blob_index
955            && blob_index >= next
956        {
957            next = blob_index + 1;
958        }
959        idx += 1;
960    }
961    next
962}
963
964async fn persist_derived_artifact_with_blob(
965    store: &dyn PhysicalStore,
966    artifact: DerivedArtifact,
967    blob_bytes: Vec<u8>,
968) -> Result<(), HirnDbError> {
969    let blob_index = artifact.blob_index.ok_or_else(|| {
970        HirnDbError::InvalidArgument("blob-backed derived artifact requires blob_index".into())
971    })?;
972    let row = blob_ds::ResourceBlobRow {
973        resource_id: artifact.resource_id,
974        blob_index,
975        data: blob_bytes,
976    };
977    let batch = blob_ds::to_batch(std::slice::from_ref(&row))?;
978    store.append(blob_ds::DATASET_NAME, batch).await?;
979    if let Err(error) = persist_derived_artifact(store, artifact).await {
980        let filter = format!(
981            "resource_id = '{}' AND blob_index = {}",
982            row.resource_id, row.blob_index
983        );
984        let _ = store.delete(blob_ds::DATASET_NAME, &filter).await;
985        return Err(error);
986    }
987    Ok(())
988}
989
990/// Fetch resource metadata only.
991pub async fn get_resource(
992    store: &dyn PhysicalStore,
993    resource_id: ResourceId,
994) -> Result<Option<ResourceObject>, HirnDbError> {
995    let Some(resource) = get_resource_raw(store, resource_id).await? else {
996        return Ok(None);
997    };
998
999    sanitize_resource_for_effective_head(store, resource)
1000        .await
1001        .map(Some)
1002}
1003
1004async fn get_resource_raw(
1005    store: &dyn PhysicalStore,
1006    resource_id: ResourceId,
1007) -> Result<Option<ResourceObject>, HirnDbError> {
1008    let filter = format!("id = '{}'", resource_id);
1009    let batches = store
1010        .scan(
1011            resource_object::DATASET_NAME,
1012            ScanOptions {
1013                filter: Some(filter),
1014                exact_filter: None,
1015                columns: None,
1016                order_by: None,
1017                limit: Some(1),
1018                offset: None,
1019            },
1020        )
1021        .await?;
1022
1023    for batch in &batches {
1024        let mut decoded = resource_object::from_batch(batch)?;
1025        if let Some(resource) = decoded.pop().filter(ResourceObject::is_storage_ready) {
1026            return Ok(Some(resource));
1027        }
1028    }
1029
1030    Ok(None)
1031}
1032
1033/// Load the blob payload for a resource-backed artifact.
1034pub async fn load_resource_blob(
1035    store: &dyn PhysicalStore,
1036    resource_id: ResourceId,
1037    blob_index: u32,
1038) -> Result<Vec<u8>, HirnDbError> {
1039    let Some(resource) = get_resource_raw(store, resource_id).await? else {
1040        return Err(HirnDbError::BlobError {
1041            dataset: resource_object::DATASET_NAME.to_string(),
1042            details: format!("resource not found or not visible: {resource_id}"),
1043        });
1044    };
1045
1046    if effective_head_for_logical_id(store, resource.logical_resource_id)
1047        .await?
1048        .is_some_and(|head| head.governance_state.hides_payload())
1049    {
1050        return Err(HirnDbError::BlobError {
1051            dataset: blob_ds::DATASET_NAME.to_string(),
1052            details: format!(
1053                "resource payload unavailable: {resource_id} is governed by the active head"
1054            ),
1055        });
1056    }
1057
1058    load_resource_blob_unchecked(store, resource_id, blob_index).await
1059}
1060
1061async fn load_resource_blob_unchecked(
1062    store: &dyn PhysicalStore,
1063    resource_id: ResourceId,
1064    blob_index: u32,
1065) -> Result<Vec<u8>, HirnDbError> {
1066    let filter = format!(
1067        "resource_id = '{}' AND blob_index = {}",
1068        resource_id, blob_index
1069    );
1070    let batches = store
1071        .scan(
1072            blob_ds::DATASET_NAME,
1073            ScanOptions {
1074                filter: Some(filter),
1075                exact_filter: None,
1076                columns: Some(vec!["data".to_string()]),
1077                order_by: None,
1078                limit: Some(1),
1079                offset: None,
1080            },
1081        )
1082        .await?;
1083
1084    let batch = batches.first().ok_or_else(|| HirnDbError::BlobError {
1085        dataset: blob_ds::DATASET_NAME.to_string(),
1086        details: format!("resource blob not found: {resource_id}:{blob_index}"),
1087    })?;
1088    if batch.num_rows() == 0 {
1089        return Err(HirnDbError::BlobError {
1090            dataset: blob_ds::DATASET_NAME.to_string(),
1091            details: format!("resource blob not found: {resource_id}:{blob_index}"),
1092        });
1093    }
1094
1095    let array = batch
1096        .column_by_name("data")
1097        .ok_or_else(|| HirnDbError::InvalidArgument("missing data column".into()))?
1098        .as_any()
1099        .downcast_ref::<BinaryArray>()
1100        .ok_or_else(|| {
1101            HirnDbError::InvalidArgument("resource blob data column wrong type".into())
1102        })?;
1103    if array.is_null(0) {
1104        return Err(HirnDbError::BlobError {
1105            dataset: blob_ds::DATASET_NAME.to_string(),
1106            details: format!("resource blob was null: {resource_id}:{blob_index}"),
1107        });
1108    }
1109
1110    Ok(array.value(0).to_vec())
1111}
1112
1113/// List derived artifacts for a resource.
1114pub async fn list_derived_artifacts(
1115    store: &dyn PhysicalStore,
1116    resource_id: ResourceId,
1117) -> Result<Vec<DerivedArtifact>, HirnDbError> {
1118    let Some(resource) = get_resource_raw(store, resource_id).await? else {
1119        return Ok(Vec::new());
1120    };
1121
1122    if effective_head_for_logical_id(store, resource.logical_resource_id)
1123        .await?
1124        .is_some_and(|head| head.governance_state.hides_payload())
1125    {
1126        return Ok(Vec::new());
1127    }
1128
1129    let filter = format!("resource_id = '{}'", resource_id);
1130    let batches = store
1131        .scan(
1132            artifact_ds::DATASET_NAME,
1133            ScanOptions {
1134                filter: Some(filter),
1135                exact_filter: None,
1136                columns: None,
1137                order_by: None,
1138                limit: None,
1139                offset: None,
1140            },
1141        )
1142        .await?;
1143
1144    let mut decoded = Vec::new();
1145    for batch in &batches {
1146        decoded.extend(artifact_ds::from_batch(batch)?);
1147    }
1148    Ok(decoded)
1149}
1150
1151/// Fetch a resource with explicit metadata/preview/full hydration semantics.
1152pub async fn fetch_resource(
1153    store: &dyn PhysicalStore,
1154    resource_id: ResourceId,
1155    hydration_mode: HydrationMode,
1156) -> Result<Option<HydratedResource>, HirnDbError> {
1157    let Some(resource) = get_resource(store, resource_id).await? else {
1158        return Ok(None);
1159    };
1160
1161    let artifacts = if matches!(hydration_mode, HydrationMode::Preview | HydrationMode::Full) {
1162        list_derived_artifacts(store, resource_id).await?
1163    } else {
1164        Vec::new()
1165    };
1166
1167    let blob = if matches!(hydration_mode, HydrationMode::Full) {
1168        match resource.location {
1169            ResourceLocation::Blob { blob_index } => {
1170                Some(load_resource_blob(store, resource_id, blob_index).await?)
1171            }
1172            ResourceLocation::Inline | ResourceLocation::External { .. } => None,
1173        }
1174    } else {
1175        None
1176    };
1177
1178    Ok(Some(HydratedResource {
1179        resource,
1180        artifacts,
1181        blob,
1182    }))
1183}
1184
1185async fn find_live_resource_by_checksum(
1186    store: &dyn PhysicalStore,
1187    namespace: Namespace,
1188    checksum: &str,
1189) -> Result<Option<ResourceObject>, HirnDbError> {
1190    let escaped_checksum = checksum.replace('\'', "''");
1191    let escaped_namespace = namespace.as_str().replace('\'', "''");
1192    let filter = format!(
1193        "checksum = '{}' AND namespace = '{}'",
1194        escaped_checksum, escaped_namespace
1195    );
1196
1197    let batches = store
1198        .scan(
1199            resource_object::DATASET_NAME,
1200            ScanOptions {
1201                filter: Some(filter),
1202                exact_filter: None,
1203                columns: None,
1204                order_by: None,
1205                limit: Some(1),
1206                offset: None,
1207            },
1208        )
1209        .await?;
1210
1211    let mut matches = Vec::new();
1212    for batch in &batches {
1213        matches.extend(
1214            resource_object::from_batch(batch)?
1215                .into_iter()
1216                .filter(ResourceObject::is_storage_ready),
1217        );
1218    }
1219
1220    Ok(select_live_resource_match(&matches))
1221}
1222
1223async fn list_resource_revisions_for_logical_id(
1224    store: &dyn PhysicalStore,
1225    logical_resource_id: LogicalResourceId,
1226) -> Result<Vec<ResourceObject>, HirnDbError> {
1227    let escaped_logical_id = logical_resource_id.to_string().replace('\'', "''");
1228    let filter = format!("logical_resource_id = '{}'", escaped_logical_id);
1229    let batches = store
1230        .scan(
1231            resource_object::DATASET_NAME,
1232            ScanOptions {
1233                filter: Some(filter),
1234                exact_filter: None,
1235                columns: None,
1236                order_by: None,
1237                limit: None,
1238                offset: None,
1239            },
1240        )
1241        .await?;
1242
1243    let mut revisions = Vec::new();
1244    for batch in &batches {
1245        revisions.extend(
1246            resource_object::from_batch(batch)?
1247                .into_iter()
1248                .filter(ResourceObject::is_storage_ready),
1249        );
1250    }
1251    revisions.sort_by(|left, right| {
1252        left.version
1253            .cmp(&right.version)
1254            .then_with(|| left.created_at.millis().cmp(&right.created_at.millis()))
1255    });
1256    Ok(revisions)
1257}
1258
1259async fn effective_head_for_logical_id(
1260    store: &dyn PhysicalStore,
1261    logical_resource_id: LogicalResourceId,
1262) -> Result<Option<ResourceObject>, HirnDbError> {
1263    let revisions = list_resource_revisions_for_logical_id(store, logical_resource_id).await?;
1264    Ok(select_active_resource_head(&revisions))
1265}
1266
1267async fn sanitize_resource_for_effective_head(
1268    store: &dyn PhysicalStore,
1269    resource: ResourceObject,
1270) -> Result<ResourceObject, HirnDbError> {
1271    let head = effective_head_for_logical_id(store, resource.logical_resource_id).await?;
1272    Ok(apply_effective_head_governance(resource, head.as_ref()))
1273}
1274
1275fn apply_effective_head_governance(
1276    mut resource: ResourceObject,
1277    head: Option<&ResourceObject>,
1278) -> ResourceObject {
1279    let Some(head) = head.filter(|head| head.governance_state.hides_payload()) else {
1280        return resource;
1281    };
1282
1283    resource.governance_state = head.governance_state;
1284    resource.governance_reason = head.governance_reason.clone();
1285    resource.governed_at = head.governed_at;
1286    resource.location = ResourceLocation::Inline;
1287    resource.checksum = None;
1288    resource.size_bytes = 0;
1289    resource.mime_type = None;
1290    resource.display_name = Some(
1291        head.display_name
1292            .clone()
1293            .filter(|name| !name.trim().is_empty())
1294            .unwrap_or_else(|| head.governance_state.placeholder_display_name().to_string()),
1295    );
1296    resource
1297}
1298
1299fn select_active_resource_head(revisions: &[ResourceObject]) -> Option<ResourceObject> {
1300    revisions
1301        .iter()
1302        .filter(|resource| resource.is_storage_ready() && resource.superseded_by.is_none())
1303        .max_by_key(|resource| resource.version)
1304        .cloned()
1305        .or_else(|| {
1306            revisions
1307                .iter()
1308                .filter(|resource| resource.is_storage_ready())
1309                .max_by_key(|resource| resource.version)
1310                .cloned()
1311        })
1312}
1313
1314fn select_live_resource_match(revisions: &[ResourceObject]) -> Option<ResourceObject> {
1315    revisions
1316        .iter()
1317        .filter(|resource| resource.is_storage_ready() && resource.superseded_by.is_none())
1318        .max_by_key(|resource| resource.version)
1319        .cloned()
1320}
1321
1322fn build_successor_revision(
1323    current: &ResourceObject,
1324    reason: Option<String>,
1325    now: Timestamp,
1326) -> ResourceObject {
1327    let mut successor = current.clone();
1328    let successor_id = ResourceId::new();
1329    successor.id = successor_id;
1330    successor.logical_resource_id = current.logical_resource_id;
1331    successor.revision_id = ResourceRevisionId::from_resource_id(successor_id);
1332    successor.version = current.version + 1;
1333    successor.revision_operation = RevisionOperation::Supersede;
1334    successor.revision_reason = reason;
1335    successor.revision_causation_id = Some(current.id);
1336    successor.superseded_by = None;
1337    successor.created_at = now;
1338    successor.updated_at = now;
1339    successor
1340}
1341
1342async fn govern_resource(
1343    store: &dyn PhysicalStore,
1344    resource_id: ResourceId,
1345    state: ResourceGovernanceState,
1346    update: ResourceGovernanceUpdate,
1347) -> Result<ResourceObject, HirnDbError> {
1348    let Some(current) = get_resource_head(store, resource_id).await? else {
1349        return Err(HirnDbError::InvalidArgument(format!(
1350            "resource not found: {resource_id}"
1351        )));
1352    };
1353    if current.governance_state == state {
1354        return Ok(current);
1355    }
1356
1357    let now = Timestamp::now();
1358    let reason = normalize_optional_string(update.reason)
1359        .or_else(|| Some(format!("resource {}", state.as_str())));
1360    let mut successor = build_successor_revision(&current, reason.clone(), now);
1361    successor.governance_state = state;
1362    successor.governance_reason = reason;
1363    successor.governed_at = Some(now);
1364    successor.display_name = Some(
1365        normalize_optional_string(update.placeholder_display_name)
1366            .unwrap_or_else(|| state.placeholder_display_name().to_string()),
1367    );
1368    successor.mime_type = None;
1369    successor.checksum = None;
1370    successor.size_bytes = 0;
1371    successor.location = ResourceLocation::Inline;
1372
1373    append_resource_revision(store, &successor, None).await?;
1374
1375    let mut updated_current = current.clone();
1376    updated_current.superseded_by = Some(successor.id);
1377    updated_current.updated_at = now;
1378    if let Err(error) = upsert_resource_revision(store, &updated_current).await {
1379        rollback_resource_revision(store, &successor).await;
1380        return Err(error);
1381    }
1382
1383    let _ = delete_lineage_payloads_and_artifacts(store, current.logical_resource_id).await;
1384
1385    Ok(successor)
1386}
1387
1388async fn list_active_resource_heads(
1389    store: &dyn PhysicalStore,
1390) -> Result<Vec<ResourceObject>, HirnDbError> {
1391    let batches = store
1392        .scan(resource_object::DATASET_NAME, ScanOptions::default())
1393        .await?;
1394    let mut grouped: BTreeMap<LogicalResourceId, Vec<ResourceObject>> = BTreeMap::new();
1395    for batch in &batches {
1396        for resource in resource_object::from_batch(batch)? {
1397            if !resource.is_storage_ready() {
1398                continue;
1399            }
1400            grouped
1401                .entry(resource.logical_resource_id)
1402                .or_default()
1403                .push(resource);
1404        }
1405    }
1406
1407    Ok(grouped
1408        .into_values()
1409        .filter_map(|revisions| select_active_resource_head(&revisions))
1410        .collect())
1411}
1412
1413const fn governance_state_for_action(action: ResourceRetentionAction) -> ResourceGovernanceState {
1414    match action {
1415        ResourceRetentionAction::Redact => ResourceGovernanceState::Redacted,
1416        ResourceRetentionAction::Purge => ResourceGovernanceState::Purged,
1417    }
1418}
1419
1420async fn delete_lineage_payloads_and_artifacts(
1421    store: &dyn PhysicalStore,
1422    logical_resource_id: LogicalResourceId,
1423) -> Result<(), HirnDbError> {
1424    let resource_ids = list_resource_revisions_for_logical_id(store, logical_resource_id)
1425        .await?
1426        .into_iter()
1427        .map(|resource| resource.id)
1428        .collect::<Vec<_>>();
1429    if resource_ids.is_empty() {
1430        return Ok(());
1431    }
1432
1433    delete_rows_for_resource_ids(store, blob_ds::DATASET_NAME, "resource_id", &resource_ids)
1434        .await?;
1435    delete_rows_for_resource_ids(
1436        store,
1437        artifact_ds::DATASET_NAME,
1438        "resource_id",
1439        &resource_ids,
1440    )
1441    .await
1442}
1443
1444async fn delete_rows_for_resource_ids(
1445    store: &dyn PhysicalStore,
1446    dataset: &str,
1447    column: &str,
1448    resource_ids: &[ResourceId],
1449) -> Result<(), HirnDbError> {
1450    if resource_ids.is_empty() {
1451        return Ok(());
1452    }
1453
1454    let filter = resource_ids
1455        .iter()
1456        .map(|resource_id| {
1457            format!(
1458                "{column} = '{}'",
1459                resource_id.to_string().replace('\'', "''")
1460            )
1461        })
1462        .collect::<Vec<_>>()
1463        .join(" OR ");
1464    store.delete(dataset, &filter).await.map(|_| ())
1465}
1466
1467async fn append_resource_revision(
1468    store: &dyn PhysicalStore,
1469    resource: &ResourceObject,
1470    blob: Option<Vec<u8>>,
1471) -> Result<(), HirnDbError> {
1472    let mut staged_resource = resource.clone();
1473    let requires_finalize = matches!(
1474        (&staged_resource.location, blob.as_ref()),
1475        (ResourceLocation::Blob { .. }, Some(_))
1476    );
1477    if requires_finalize {
1478        staged_resource.storage_ready = false;
1479    }
1480
1481    let batch = resource_object::to_batch(std::slice::from_ref(&staged_resource))?;
1482    store.append(resource_object::DATASET_NAME, batch).await?;
1483
1484    if let (ResourceLocation::Blob { blob_index }, Some(blob_bytes)) =
1485        (&staged_resource.location, blob)
1486    {
1487        let row = blob_ds::ResourceBlobRow {
1488            resource_id: staged_resource.id,
1489            blob_index: *blob_index,
1490            data: blob_bytes,
1491        };
1492        let batch = blob_ds::to_batch(std::slice::from_ref(&row))?;
1493        if let Err(error) = store.append(blob_ds::DATASET_NAME, batch).await {
1494            rollback_resource_revision(store, &staged_resource).await;
1495            return Err(error);
1496        }
1497    }
1498
1499    if requires_finalize {
1500        staged_resource.storage_ready = true;
1501        if let Err(error) = upsert_resource_revision(store, &staged_resource).await {
1502            rollback_resource_revision(store, &staged_resource).await;
1503            return Err(error);
1504        }
1505    }
1506
1507    Ok(())
1508}
1509
1510async fn upsert_resource_revision(
1511    store: &dyn PhysicalStore,
1512    resource: &ResourceObject,
1513) -> Result<(), HirnDbError> {
1514    let batch = resource_object::to_batch(std::slice::from_ref(resource))?;
1515    store
1516        .merge_insert(resource_object::DATASET_NAME, &["id"], batch)
1517        .await
1518}
1519
1520async fn rollback_resource_revision(store: &dyn PhysicalStore, resource: &ResourceObject) {
1521    let resource_filter = format!("id = '{}'", resource.id);
1522    let _ = store
1523        .delete(resource_object::DATASET_NAME, &resource_filter)
1524        .await;
1525
1526    if matches!(resource.location, ResourceLocation::Blob { .. }) {
1527        let blob_filter = format!("resource_id = '{}'", resource.id);
1528        let _ = store.delete(blob_ds::DATASET_NAME, &blob_filter).await;
1529    }
1530}
1531
1532async fn prepare_blob_payload(
1533    store: &dyn PhysicalStore,
1534    current: Option<&ResourceObject>,
1535    resource: &mut ResourceObject,
1536    blob: Option<Vec<u8>>,
1537) -> Result<Option<Vec<u8>>, HirnDbError> {
1538    match (&resource.location, blob) {
1539        (ResourceLocation::Blob { .. }, Some(blob_bytes)) => {
1540            sync_blob_metadata(resource, &blob_bytes)?;
1541            Ok(Some(blob_bytes))
1542        }
1543        (ResourceLocation::Blob { .. }, None) => {
1544            let Some(current) = current else {
1545                return Err(HirnDbError::InvalidArgument(
1546                    "blob-backed resource requires payload bytes".into(),
1547                ));
1548            };
1549            let ResourceLocation::Blob { blob_index } = current.location else {
1550                return Err(HirnDbError::InvalidArgument(
1551                    "cannot supersede a non-blob resource without new payload bytes".into(),
1552                ));
1553            };
1554            let blob_bytes = load_resource_blob_unchecked(store, current.id, blob_index).await?;
1555            sync_blob_metadata(resource, &blob_bytes)?;
1556            Ok(Some(blob_bytes))
1557        }
1558        (ResourceLocation::Inline | ResourceLocation::External { .. }, Some(_)) => {
1559            Err(HirnDbError::InvalidArgument(
1560                "only ResourceLocation::Blob may carry persisted payload bytes".into(),
1561            ))
1562        }
1563        (ResourceLocation::Inline | ResourceLocation::External { .. }, None) => Ok(None),
1564    }
1565}
1566
1567fn sync_blob_metadata(resource: &mut ResourceObject, blob: &[u8]) -> Result<(), HirnDbError> {
1568    if resource.checksum.is_none() {
1569        resource.checksum = Some(format!("blake3:{}", blake3::hash(blob).to_hex()));
1570    }
1571    resource.size_bytes = blob.len() as u64;
1572    Ok(())
1573}
1574
1575#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1576struct ResourceQuotaUsage {
1577    active_resources: usize,
1578    total_bytes: u64,
1579}
1580
1581async fn enforce_resource_quota_policy(
1582    store: &dyn PhysicalStore,
1583    new_head: &ResourceObject,
1584    replaced_head: Option<&ResourceObject>,
1585    quota_policy: &ResourceQuotaPolicy,
1586) -> Result<(), HirnDbError> {
1587    if quota_policy.is_empty() {
1588        return Ok(());
1589    }
1590
1591    let active_heads = list_active_resource_heads(store).await?;
1592    for rule in quota_policy.rules_for(new_head) {
1593        let usage = quota_usage_for_scope(&active_heads, rule.scope);
1594        let projected_active_resources =
1595            usage.active_resources + usize::from(replaced_head.is_none());
1596        if let Some(max_active_resources) = rule.max_active_resources
1597            && projected_active_resources > max_active_resources
1598        {
1599            return Err(HirnDbError::LimitExceeded(format!(
1600                "resource quota exceeded for {}: projected {} active resources exceeds limit {}",
1601                quota_scope_label(rule.scope),
1602                projected_active_resources,
1603                max_active_resources,
1604            )));
1605        }
1606
1607        let replaced_bytes = replaced_head.map_or(0, |head| head.size_bytes);
1608        let projected_total_bytes = usage
1609            .total_bytes
1610            .saturating_sub(replaced_bytes)
1611            .saturating_add(new_head.size_bytes);
1612        if let Some(max_total_bytes) = rule.max_total_bytes
1613            && projected_total_bytes > max_total_bytes
1614        {
1615            return Err(HirnDbError::LimitExceeded(format!(
1616                "resource quota exceeded for {}: projected {} bytes exceeds limit {}",
1617                quota_scope_label(rule.scope),
1618                projected_total_bytes,
1619                max_total_bytes,
1620            )));
1621        }
1622    }
1623
1624    Ok(())
1625}
1626
1627fn quota_usage_for_scope(
1628    active_heads: &[ResourceObject],
1629    scope: ResourceQuotaScope,
1630) -> ResourceQuotaUsage {
1631    active_heads
1632        .iter()
1633        .filter(|resource| scope.matches(resource))
1634        .fold(ResourceQuotaUsage::default(), |mut usage, resource| {
1635            usage.active_resources += 1;
1636            usage.total_bytes = usage.total_bytes.saturating_add(resource.size_bytes);
1637            usage
1638        })
1639}
1640
1641fn quota_scope_label(scope: ResourceQuotaScope) -> String {
1642    match scope {
1643        ResourceQuotaScope::Realm => "realm".to_string(),
1644        ResourceQuotaScope::Namespace(namespace) => format!("namespace `{}`", namespace.as_str()),
1645        ResourceQuotaScope::Agent(agent_id) => format!("agent `{}`", agent_id.as_str()),
1646    }
1647}
1648
1649const fn default_text_artifact_plan(
1650    modality: ModalityProfile,
1651    role: EvidenceRole,
1652) -> &'static [DefaultTextArtifactPlan] {
1653    match role {
1654        EvidenceRole::Source => match modality {
1655            ModalityProfile::Image => &IMAGE_SOURCE_TEXT_ARTIFACTS,
1656            ModalityProfile::Audio => &AUDIO_SOURCE_TEXT_ARTIFACTS,
1657            ModalityProfile::Code => &CODE_SOURCE_TEXT_ARTIFACTS,
1658            ModalityProfile::Structured => &STRUCTURED_SOURCE_TEXT_ARTIFACTS,
1659            ModalityProfile::Text => &NO_TEXT_ARTIFACTS,
1660            _ => &PREVIEW_TEXT_ARTIFACTS,
1661        },
1662        EvidenceRole::Attachment
1663        | EvidenceRole::Proof
1664        | EvidenceRole::Output
1665        | EvidenceRole::Preview
1666        | EvidenceRole::Derived => &PREVIEW_TEXT_ARTIFACTS,
1667    }
1668}
1669
1670const fn default_blob_artifact_plan(
1671    modality: ModalityProfile,
1672    role: EvidenceRole,
1673) -> &'static [DefaultBlobArtifactPlan] {
1674    match role {
1675        EvidenceRole::Source => match modality {
1676            ModalityProfile::Image => &IMAGE_SOURCE_BLOB_ARTIFACTS,
1677            _ => &NO_BLOB_ARTIFACTS,
1678        },
1679        EvidenceRole::Attachment
1680        | EvidenceRole::Proof
1681        | EvidenceRole::Output
1682        | EvidenceRole::Preview
1683        | EvidenceRole::Derived => &NO_BLOB_ARTIFACTS,
1684    }
1685}
1686
1687fn normalize_optional_string(value: Option<String>) -> Option<String> {
1688    value
1689        .map(|value| value.trim().to_string())
1690        .filter(|value| !value.is_empty())
1691}
1692
1693#[cfg(test)]
1694mod tests {
1695    use super::*;
1696    use std::sync::Arc;
1697
1698    use arrow_array::RecordBatch;
1699    use async_trait::async_trait;
1700    use datafusion::catalog::TableProvider;
1701    use hirn_core::{
1702        DerivedArtifact, DerivedArtifactKind, ModalityProfile, ResourceGovernanceState,
1703        ResourceLocation, ResourceObject, ResourceQuotaPolicy, ResourceQuotaRule,
1704        ResourceQuotaScope, ResourceRetentionAction, ResourceRetentionPolicy,
1705        ResourceRetentionRule, RevisionState,
1706    };
1707
1708    use crate::HirnDbError;
1709    use crate::datasets::{resource_blob as blob_ds, resource_object};
1710    use crate::memory_store::MemoryStore;
1711    use crate::mutation_envelope_ops::{MutationEnvelopeState, get_mutation_envelope};
1712    use crate::policy_store::{CURRENT_PRINCIPAL, NamespacePolicy, PolicyEnforcedStore};
1713    use crate::store::{
1714        ColumnTransform, CompactOptions, CompactResult, DatasetInfo, FtsSearchOptions,
1715        HybridSearchOptions, IndexConfig, MultivectorSearchOptions, PhysicalStore, ScanOptions,
1716        VectorSearchOptions, VersionTag,
1717    };
1718
1719    struct FaultInjectingStore {
1720        inner: MemoryStore,
1721        fail_blob_append: bool,
1722        fail_resource_merge_insert: bool,
1723    }
1724
1725    #[async_trait]
1726    impl PhysicalStore for FaultInjectingStore {
1727        async fn append(&self, dataset: &str, batch: RecordBatch) -> Result<(), HirnDbError> {
1728            if self.fail_blob_append && dataset == blob_ds::DATASET_NAME {
1729                return Err(HirnDbError::Unsupported(
1730                    "simulated blob append failure".to_string(),
1731                ));
1732            }
1733            self.inner.append(dataset, batch).await
1734        }
1735
1736        async fn append_batches(
1737            &self,
1738            dataset: &str,
1739            batches: Vec<RecordBatch>,
1740        ) -> Result<(), HirnDbError> {
1741            for batch in batches {
1742                self.append(dataset, batch).await?;
1743            }
1744            Ok(())
1745        }
1746
1747        async fn scan(
1748            &self,
1749            dataset: &str,
1750            opts: ScanOptions,
1751        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1752            self.inner.scan(dataset, opts).await
1753        }
1754
1755        async fn scan_stream(
1756            &self,
1757            dataset: &str,
1758            opts: ScanOptions,
1759        ) -> Result<crate::store::RecordBatchStream, HirnDbError> {
1760            self.inner.scan_stream(dataset, opts).await
1761        }
1762
1763        async fn delete(&self, dataset: &str, predicate: &str) -> Result<u64, HirnDbError> {
1764            self.inner.delete(dataset, predicate).await
1765        }
1766
1767        async fn merge_insert(
1768            &self,
1769            dataset: &str,
1770            on: &[&str],
1771            batch: RecordBatch,
1772        ) -> Result<(), HirnDbError> {
1773            if self.fail_resource_merge_insert && dataset == resource_object::DATASET_NAME {
1774                return Err(HirnDbError::Unsupported(
1775                    "simulated resource finalize failure".to_string(),
1776                ));
1777            }
1778            self.inner.merge_insert(dataset, on, batch).await
1779        }
1780
1781        async fn update_where(
1782            &self,
1783            dataset: &str,
1784            filter: &str,
1785            updates: &[(&str, &str)],
1786        ) -> Result<u64, HirnDbError> {
1787            self.inner.update_where(dataset, filter, updates).await
1788        }
1789
1790        async fn count(&self, dataset: &str, filter: Option<&str>) -> Result<u64, HirnDbError> {
1791            self.inner.count(dataset, filter).await
1792        }
1793
1794        async fn vector_search(
1795            &self,
1796            dataset: &str,
1797            opts: VectorSearchOptions,
1798        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1799            self.inner.vector_search(dataset, opts).await
1800        }
1801
1802        async fn vector_search_many(
1803            &self,
1804            dataset: &str,
1805            queries: Vec<VectorSearchOptions>,
1806        ) -> Result<Vec<Vec<RecordBatch>>, HirnDbError> {
1807            self.inner.vector_search_many(dataset, queries).await
1808        }
1809
1810        async fn fts_search(
1811            &self,
1812            dataset: &str,
1813            opts: FtsSearchOptions,
1814        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1815            self.inner.fts_search(dataset, opts).await
1816        }
1817
1818        async fn hybrid_search(
1819            &self,
1820            dataset: &str,
1821            opts: HybridSearchOptions,
1822        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1823            self.inner.hybrid_search(dataset, opts).await
1824        }
1825
1826        async fn multivector_search(
1827            &self,
1828            dataset: &str,
1829            opts: MultivectorSearchOptions,
1830        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1831            self.inner.multivector_search(dataset, opts).await
1832        }
1833
1834        async fn create_index(
1835            &self,
1836            dataset: &str,
1837            config: IndexConfig,
1838        ) -> Result<(), HirnDbError> {
1839            self.inner.create_index(dataset, config).await
1840        }
1841
1842        async fn optimize_indices(&self, dataset: &str) -> Result<(), HirnDbError> {
1843            self.inner.optimize_indices(dataset).await
1844        }
1845
1846        async fn compact(
1847            &self,
1848            dataset: &str,
1849            opts: CompactOptions,
1850        ) -> Result<CompactResult, HirnDbError> {
1851            self.inner.compact(dataset, opts).await
1852        }
1853
1854        async fn version(&self, dataset: &str) -> Result<u64, HirnDbError> {
1855            self.inner.version(dataset).await
1856        }
1857
1858        async fn tag(&self, dataset: &str, tag: &str) -> Result<(), HirnDbError> {
1859            self.inner.tag(dataset, tag).await
1860        }
1861
1862        async fn checkout(&self, dataset: &str, version: u64) -> Result<(), HirnDbError> {
1863            self.inner.checkout(dataset, version).await
1864        }
1865
1866        async fn list_tags(&self, dataset: &str) -> Result<Vec<VersionTag>, HirnDbError> {
1867            self.inner.list_tags(dataset).await
1868        }
1869
1870        async fn list_datasets(&self) -> Result<Vec<DatasetInfo>, HirnDbError> {
1871            self.inner.list_datasets().await
1872        }
1873
1874        async fn exists(&self, dataset: &str) -> Result<bool, HirnDbError> {
1875            self.inner.exists(dataset).await
1876        }
1877
1878        async fn list_namespaces(&self) -> Result<Vec<String>, HirnDbError> {
1879            self.inner.list_namespaces().await
1880        }
1881
1882        async fn create_namespace(&self, name: &str) -> Result<(), HirnDbError> {
1883            self.inner.create_namespace(name).await
1884        }
1885
1886        async fn drop_namespace(&self, name: &str) -> Result<(), HirnDbError> {
1887            self.inner.drop_namespace(name).await
1888        }
1889
1890        async fn add_columns(
1891            &self,
1892            dataset: &str,
1893            transforms: Vec<ColumnTransform>,
1894        ) -> Result<(), HirnDbError> {
1895            self.inner.add_columns(dataset, transforms).await
1896        }
1897
1898        async fn drop_columns(&self, dataset: &str, columns: &[&str]) -> Result<(), HirnDbError> {
1899            self.inner.drop_columns(dataset, columns).await
1900        }
1901
1902        async fn table_provider(&self, dataset: &str) -> Option<Arc<dyn TableProvider>> {
1903            self.inner.table_provider(dataset).await
1904        }
1905    }
1906
1907    #[tokio::test(flavor = "multi_thread")]
1908    async fn persist_resource_deduplicates_by_checksum_within_namespace() {
1909        let store = MemoryStore::new();
1910        let blob = vec![1_u8; 2048];
1911
1912        let first = ResourceObject::builder()
1913            .modality(ModalityProfile::Image)
1914            .mime_type("image/png")
1915            .checksum("blake3:dedup")
1916            .size_bytes(blob.len() as u64)
1917            .location(ResourceLocation::Blob { blob_index: 0 })
1918            .build()
1919            .unwrap();
1920        let second = ResourceObject::builder()
1921            .modality(ModalityProfile::Image)
1922            .mime_type("image/png")
1923            .checksum("blake3:dedup")
1924            .size_bytes(blob.len() as u64)
1925            .location(ResourceLocation::Blob { blob_index: 0 })
1926            .build()
1927            .unwrap();
1928
1929        let persisted_first = persist_resource(&store, first, Some(blob.clone()))
1930            .await
1931            .unwrap();
1932        let persisted_second = persist_resource(&store, second, Some(blob)).await.unwrap();
1933
1934        assert_eq!(persisted_first.id, persisted_second.id);
1935
1936        let resources = store
1937            .scan(resource_object::DATASET_NAME, ScanOptions::default())
1938            .await
1939            .unwrap();
1940        let blobs = store
1941            .scan(blob_ds::DATASET_NAME, ScanOptions::default())
1942            .await
1943            .unwrap();
1944
1945        assert_eq!(resources.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
1946        assert_eq!(blobs.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
1947    }
1948
1949    #[tokio::test(flavor = "multi_thread")]
1950    async fn persist_resource_rolls_back_when_blob_append_fails() {
1951        let store = FaultInjectingStore {
1952            inner: MemoryStore::new(),
1953            fail_blob_append: true,
1954            fail_resource_merge_insert: false,
1955        };
1956        let blob = vec![4_u8; 128];
1957        let resource = ResourceObject::builder()
1958            .modality(ModalityProfile::Document)
1959            .location(ResourceLocation::Blob { blob_index: 0 })
1960            .build()
1961            .unwrap();
1962        let resource_id = resource.id;
1963
1964        let error = persist_resource(&store, resource, Some(blob))
1965            .await
1966            .unwrap_err();
1967
1968        assert!(matches!(error, HirnDbError::Unsupported(_)));
1969        assert!(get_resource(&store, resource_id).await.unwrap().is_none());
1970        assert!(
1971            get_resource_head(&store, resource_id)
1972                .await
1973                .unwrap()
1974                .is_none()
1975        );
1976        assert_eq!(
1977            store
1978                .scan(resource_object::DATASET_NAME, ScanOptions::default())
1979                .await
1980                .unwrap()
1981                .iter()
1982                .map(|batch| batch.num_rows())
1983                .sum::<usize>(),
1984            0
1985        );
1986        assert_eq!(
1987            store
1988                .scan(blob_ds::DATASET_NAME, ScanOptions::default())
1989                .await
1990                .unwrap()
1991                .iter()
1992                .map(|batch| batch.num_rows())
1993                .sum::<usize>(),
1994            0
1995        );
1996    }
1997
1998    #[tokio::test(flavor = "multi_thread")]
1999    async fn persist_resource_rolls_back_when_visibility_finalize_fails() {
2000        let store = FaultInjectingStore {
2001            inner: MemoryStore::new(),
2002            fail_blob_append: false,
2003            fail_resource_merge_insert: true,
2004        };
2005        let blob = vec![6_u8; 128];
2006        let resource = ResourceObject::builder()
2007            .modality(ModalityProfile::Document)
2008            .location(ResourceLocation::Blob { blob_index: 0 })
2009            .build()
2010            .unwrap();
2011        let resource_id = resource.id;
2012
2013        let error = persist_resource(&store, resource, Some(blob))
2014            .await
2015            .unwrap_err();
2016
2017        assert!(matches!(error, HirnDbError::Unsupported(_)));
2018        assert!(get_resource(&store, resource_id).await.unwrap().is_none());
2019        assert!(
2020            get_resource_head(&store, resource_id)
2021                .await
2022                .unwrap()
2023                .is_none()
2024        );
2025        assert_eq!(
2026            store
2027                .scan(resource_object::DATASET_NAME, ScanOptions::default())
2028                .await
2029                .unwrap()
2030                .iter()
2031                .map(|batch| batch.num_rows())
2032                .sum::<usize>(),
2033            0
2034        );
2035        assert_eq!(
2036            store
2037                .scan(blob_ds::DATASET_NAME, ScanOptions::default())
2038                .await
2039                .unwrap()
2040                .iter()
2041                .map(|batch| batch.num_rows())
2042                .sum::<usize>(),
2043            0
2044        );
2045    }
2046
2047    #[tokio::test(flavor = "multi_thread")]
2048    async fn persist_resource_dedup_does_not_cross_namespaces() {
2049        let store = MemoryStore::new();
2050        let blob = vec![5_u8; 64];
2051
2052        let alpha = ResourceObject::builder()
2053            .modality(ModalityProfile::Document)
2054            .checksum("blake3:isolation")
2055            .size_bytes(blob.len() as u64)
2056            .namespace(Namespace::new("alpha").unwrap())
2057            .location(ResourceLocation::Blob { blob_index: 0 })
2058            .build()
2059            .unwrap();
2060        let beta = ResourceObject::builder()
2061            .modality(ModalityProfile::Document)
2062            .checksum("blake3:isolation")
2063            .size_bytes(blob.len() as u64)
2064            .namespace(Namespace::new("beta").unwrap())
2065            .location(ResourceLocation::Blob { blob_index: 0 })
2066            .build()
2067            .unwrap();
2068
2069        let alpha = persist_resource(&store, alpha, Some(blob.clone()))
2070            .await
2071            .unwrap();
2072        let beta = persist_resource(&store, beta, Some(blob)).await.unwrap();
2073
2074        assert_ne!(alpha.id, beta.id);
2075    }
2076
2077    #[tokio::test(flavor = "multi_thread")]
2078    async fn fetch_resource_respects_hydration_mode() {
2079        let store = MemoryStore::new();
2080        let blob = vec![9_u8; 512];
2081        let resource = ResourceObject::builder()
2082            .modality(ModalityProfile::Audio)
2083            .checksum("blake3:preview")
2084            .size_bytes(blob.len() as u64)
2085            .location(ResourceLocation::Blob { blob_index: 0 })
2086            .build()
2087            .unwrap();
2088        let resource = persist_resource(&store, resource, Some(blob.clone()))
2089            .await
2090            .unwrap();
2091
2092        let mut artifact = DerivedArtifact::builder()
2093            .resource_id(resource.id)
2094            .kind(DerivedArtifactKind::Transcript)
2095            .modality(ModalityProfile::Text)
2096            .text_content("preview transcript")
2097            .build()
2098            .unwrap();
2099        artifact.created_at = hirn_core::Timestamp::from_millis(artifact.created_at.millis());
2100        persist_derived_artifact(&store, artifact.clone())
2101            .await
2102            .unwrap();
2103
2104        let metadata_only = fetch_resource(&store, resource.id, HydrationMode::MetadataOnly)
2105            .await
2106            .unwrap()
2107            .unwrap();
2108        assert!(metadata_only.artifacts.is_empty());
2109        assert!(metadata_only.blob.is_none());
2110
2111        let preview = fetch_resource(&store, resource.id, HydrationMode::Preview)
2112            .await
2113            .unwrap()
2114            .unwrap();
2115        assert_eq!(preview.artifacts, vec![artifact.clone()]);
2116        assert!(preview.blob.is_none());
2117
2118        let full = fetch_resource(&store, resource.id, HydrationMode::Full)
2119            .await
2120            .unwrap()
2121            .unwrap();
2122        assert_eq!(full.artifacts, vec![artifact]);
2123        assert_eq!(full.blob, Some(blob));
2124    }
2125
2126    #[tokio::test(flavor = "multi_thread")]
2127    async fn persist_default_derived_artifacts_adds_caption_ocr_and_thumbnail_for_images() {
2128        let store = MemoryStore::new();
2129        let source_image = image::DynamicImage::new_rgba8(4, 4);
2130        let mut encoded = Cursor::new(Vec::new());
2131        source_image
2132            .write_to(&mut encoded, ImageFormat::Png)
2133            .unwrap();
2134        let blob = encoded.into_inner();
2135        let resource = ResourceObject::builder()
2136            .modality(ModalityProfile::Image)
2137            .mime_type("image/png")
2138            .location(ResourceLocation::Blob { blob_index: 0 })
2139            .build()
2140            .unwrap();
2141        let resource = persist_resource_with_quota_policy(
2142            &store,
2143            resource,
2144            Some(blob.clone()),
2145            &ResourceQuotaPolicy::default(),
2146        )
2147        .await
2148        .unwrap();
2149        let created = persist_default_derived_artifacts(
2150            &store,
2151            &resource,
2152            EvidenceRole::Source,
2153            DerivedArtifactInput::new("diagram of the auth handshake")
2154                .with_blob(&blob, Some("image/png")),
2155        )
2156        .await
2157        .unwrap();
2158        let links = evidence_links_for_derived_artifacts(&created, Some(0));
2159
2160        let artifacts = list_derived_artifacts(&store, resource.id).await.unwrap();
2161        assert_eq!(artifacts.len(), 3);
2162        assert_eq!(artifacts[0].kind, DerivedArtifactKind::Caption);
2163        assert_eq!(
2164            artifacts[0].text_content.as_deref(),
2165            Some("diagram of the auth handshake")
2166        );
2167        assert_eq!(artifacts[1].kind, DerivedArtifactKind::OcrText);
2168        assert_eq!(
2169            artifacts[1].text_content.as_deref(),
2170            Some("diagram of the auth handshake")
2171        );
2172        assert_eq!(artifacts[2].kind, DerivedArtifactKind::Thumbnail);
2173        assert_eq!(artifacts[2].mime_type.as_deref(), Some("image/png"));
2174        assert_eq!(artifacts[2].blob_index, Some(1));
2175        assert!(artifacts[2].text_content.is_none());
2176        assert!(matches!(
2177            artifacts[1].metadata.get("generation_strategy"),
2178            Some(hirn_core::metadata::MetadataValue::String(value)) if value == "text_surrogate_fallback"
2179        ));
2180        assert!(matches!(
2181            artifacts[1].metadata.get("fallback_source"),
2182            Some(hirn_core::metadata::MetadataValue::String(value)) if value == "image_description"
2183        ));
2184        assert_eq!(links.len(), 3);
2185        assert_eq!(links[0].role, EvidenceRole::Derived);
2186        assert_eq!(links[0].provenance.as_str(), "transformed_summary");
2187        assert_eq!(links[1].role, EvidenceRole::Derived);
2188        assert_eq!(links[1].provenance.as_str(), "generated_artifact");
2189        assert_eq!(links[2].role, EvidenceRole::Preview);
2190        assert_eq!(links[2].provenance.as_str(), "generated_artifact");
2191
2192        let thumbnail_blob = load_resource_blob(&store, resource.id, 1).await.unwrap();
2193        let thumbnail =
2194            image::load_from_memory_with_format(&thumbnail_blob, ImageFormat::Png).unwrap();
2195        assert!(thumbnail.width() <= THUMBNAIL_MAX_DIMENSION_PX);
2196        assert!(thumbnail.height() <= THUMBNAIL_MAX_DIMENSION_PX);
2197    }
2198
2199    #[tokio::test(flavor = "multi_thread")]
2200    async fn persist_default_derived_artifacts_records_generation_failure_for_empty_inputs() {
2201        let store = MemoryStore::new();
2202        let blob = vec![7_u8; 48];
2203        let resource = ResourceObject::builder()
2204            .modality(ModalityProfile::Image)
2205            .mime_type("image/png")
2206            .location(ResourceLocation::Blob { blob_index: 0 })
2207            .build()
2208            .unwrap();
2209        let resource = persist_resource_with_quota_policy(
2210            &store,
2211            resource,
2212            Some(blob.clone()),
2213            &ResourceQuotaPolicy::default(),
2214        )
2215        .await
2216        .unwrap();
2217        let created = persist_default_derived_artifacts(
2218            &store,
2219            &resource,
2220            EvidenceRole::Source,
2221            DerivedArtifactInput::new(""),
2222        )
2223        .await
2224        .unwrap();
2225
2226        let artifacts = list_derived_artifacts(&store, resource.id).await.unwrap();
2227        assert_eq!(artifacts.len(), 2);
2228        assert!(
2229            artifacts
2230                .iter()
2231                .all(|artifact| artifact.kind == DerivedArtifactKind::GenerationFailure)
2232        );
2233        assert!(artifacts.iter().any(|artifact| {
2234            artifact.text_content.as_deref()
2235                == Some("caption generation failed: source text was empty")
2236        }));
2237        assert!(artifacts.iter().any(|artifact| {
2238            artifact.text_content.as_deref()
2239                == Some("thumbnail generation failed: source blob was unavailable")
2240        }));
2241        assert!(evidence_links_for_derived_artifacts(&created, Some(0)).is_empty());
2242
2243        let hydrated = fetch_resource(&store, resource.id, HydrationMode::Full)
2244            .await
2245            .unwrap()
2246            .unwrap();
2247        assert_eq!(hydrated.blob, Some(blob));
2248    }
2249
2250    #[tokio::test(flavor = "multi_thread")]
2251    async fn supersede_resource_promotes_new_head_and_preserves_history() {
2252        let store = MemoryStore::new();
2253        let original_blob = vec![7_u8; 32];
2254        let original = ResourceObject::builder()
2255            .modality(ModalityProfile::Image)
2256            .mime_type("image/png")
2257            .display_name("frame-v1.png")
2258            .checksum(format!("blake3:{}", blake3::hash(&original_blob).to_hex()))
2259            .size_bytes(original_blob.len() as u64)
2260            .location(ResourceLocation::Blob { blob_index: 0 })
2261            .build()
2262            .unwrap();
2263        let original = persist_resource(&store, original, Some(original_blob.clone()))
2264            .await
2265            .unwrap();
2266
2267        let successor_blob = vec![8_u8; 48];
2268        let successor = supersede_resource(
2269            &store,
2270            original.id,
2271            ResourceSupersession {
2272                reason: Some("cropped and re-encoded".into()),
2273                display_name: Some("frame-v2.png".into()),
2274                checksum: Some(format!("blake3:{}", blake3::hash(&successor_blob).to_hex())),
2275                ..ResourceSupersession::default()
2276            },
2277            Some(successor_blob.clone()),
2278        )
2279        .await
2280        .unwrap();
2281
2282        let active_head = get_resource_head(&store, original.id)
2283            .await
2284            .unwrap()
2285            .unwrap();
2286        assert_eq!(active_head.id, successor.id);
2287        assert_eq!(
2288            active_head.logical_resource_id,
2289            original.logical_resource_id
2290        );
2291        assert_eq!(active_head.version, 2);
2292        assert_eq!(active_head.revision_operation, RevisionOperation::Supersede);
2293        assert_eq!(
2294            active_head.revision_reason.as_deref(),
2295            Some("cropped and re-encoded")
2296        );
2297        assert_eq!(active_head.revision_causation_id, Some(original.id));
2298        assert_eq!(active_head.display_name.as_deref(), Some("frame-v2.png"));
2299
2300        let historical = get_resource(&store, original.id).await.unwrap().unwrap();
2301        assert_eq!(historical.superseded_by, Some(successor.id));
2302        assert_eq!(
2303            historical.revision_state_against(&active_head),
2304            RevisionState::Superseded
2305        );
2306
2307        let revisions = list_resource_revisions(&store, original.id).await.unwrap();
2308        assert_eq!(revisions.len(), 2);
2309        assert_eq!(revisions[0].id, original.id);
2310        assert_eq!(revisions[1].id, successor.id);
2311
2312        let historical_fetch = fetch_resource(&store, original.id, HydrationMode::Full)
2313            .await
2314            .unwrap()
2315            .unwrap();
2316        assert_eq!(historical_fetch.blob, Some(original_blob));
2317
2318        let head_fetch = fetch_resource(&store, successor.id, HydrationMode::Full)
2319            .await
2320            .unwrap()
2321            .unwrap();
2322        assert_eq!(head_fetch.blob, Some(successor_blob));
2323    }
2324
2325    #[tokio::test(flavor = "multi_thread")]
2326    async fn reconcile_resource_head_mutations_repairs_missing_backlink() {
2327        let store = MemoryStore::new();
2328        let original_blob = vec![1_u8; 16];
2329        let original = ResourceObject::builder()
2330            .modality(ModalityProfile::Document)
2331            .display_name("draft-v1.txt")
2332            .location(ResourceLocation::Blob { blob_index: 0 })
2333            .build()
2334            .unwrap();
2335        let original = persist_resource(&store, original, Some(original_blob))
2336            .await
2337            .unwrap();
2338
2339        let successor_blob = vec![2_u8; 24];
2340        let now = Timestamp::now();
2341        let mut successor =
2342            build_successor_revision(&original, Some("published revision".to_string()), now);
2343        successor.display_name = Some("draft-v2.txt".into());
2344        successor.location = ResourceLocation::Blob { blob_index: 0 };
2345        successor.size_bytes = successor_blob.len() as u64;
2346        successor.checksum = Some(format!("blake3:{}", blake3::hash(&successor_blob).to_hex()));
2347
2348        let envelope = build_resource_head_transition_envelope(&original, &successor).unwrap();
2349        crate::mutation_envelope_ops::append_mutation_envelope(&store, &envelope)
2350            .await
2351            .unwrap();
2352        append_resource_revision(&store, &successor, Some(successor_blob))
2353            .await
2354            .unwrap();
2355
2356        let reconciled = reconcile_resource_head_mutations(&store).await.unwrap();
2357        assert_eq!(reconciled, 1);
2358
2359        let original_after = get_resource(&store, original.id).await.unwrap().unwrap();
2360        assert_eq!(original_after.superseded_by, Some(successor.id));
2361
2362        let envelope_after = get_mutation_envelope(&store, &envelope.id)
2363            .await
2364            .unwrap()
2365            .unwrap();
2366        assert_eq!(envelope_after.state, MutationEnvelopeState::Applied);
2367    }
2368
2369    #[tokio::test(flavor = "multi_thread")]
2370    async fn supersede_resource_can_preserve_existing_blob_without_new_payload() {
2371        let store = MemoryStore::new();
2372        let blob = vec![3_u8; 24];
2373        let original = ResourceObject::builder()
2374            .modality(ModalityProfile::Document)
2375            .display_name("brief-v1.pdf")
2376            .location(ResourceLocation::Blob { blob_index: 0 })
2377            .build()
2378            .unwrap();
2379        let original = persist_resource(&store, original, Some(blob.clone()))
2380            .await
2381            .unwrap();
2382
2383        let successor = supersede_resource(
2384            &store,
2385            original.id,
2386            ResourceSupersession {
2387                reason: Some("metadata refresh".into()),
2388                display_name: Some("brief-v2.pdf".into()),
2389                ..ResourceSupersession::default()
2390            },
2391            None,
2392        )
2393        .await
2394        .unwrap();
2395
2396        let hydrated = fetch_resource(&store, successor.id, HydrationMode::Full)
2397            .await
2398            .unwrap()
2399            .unwrap();
2400        assert_eq!(hydrated.blob, Some(blob));
2401        assert_eq!(
2402            hydrated.resource.display_name.as_deref(),
2403            Some("brief-v2.pdf")
2404        );
2405        assert_eq!(hydrated.resource.revision_causation_id, Some(original.id));
2406    }
2407
2408    #[tokio::test(flavor = "multi_thread")]
2409    async fn persist_resource_does_not_deduplicate_to_superseded_checksum_match() {
2410        let store = MemoryStore::new();
2411        let original_blob = vec![1_u8; 16];
2412        let original = ResourceObject::builder()
2413            .modality(ModalityProfile::Image)
2414            .checksum(format!("blake3:{}", blake3::hash(&original_blob).to_hex()))
2415            .location(ResourceLocation::Blob { blob_index: 0 })
2416            .build()
2417            .unwrap();
2418        let original = persist_resource(&store, original, Some(original_blob.clone()))
2419            .await
2420            .unwrap();
2421
2422        supersede_resource(
2423            &store,
2424            original.id,
2425            ResourceSupersession {
2426                checksum: Some("blake3:replacement".into()),
2427                ..ResourceSupersession::default()
2428            },
2429            Some(vec![2_u8; 20]),
2430        )
2431        .await
2432        .unwrap();
2433
2434        let replacement_candidate = ResourceObject::builder()
2435            .modality(ModalityProfile::Image)
2436            .checksum(format!("blake3:{}", blake3::hash(&original_blob).to_hex()))
2437            .location(ResourceLocation::Blob { blob_index: 0 })
2438            .build()
2439            .unwrap();
2440        let replacement = persist_resource(&store, replacement_candidate, Some(original_blob))
2441            .await
2442            .unwrap();
2443
2444        assert_ne!(replacement.id, original.id);
2445        assert_ne!(
2446            replacement.logical_resource_id,
2447            original.logical_resource_id
2448        );
2449    }
2450
2451    #[tokio::test(flavor = "multi_thread")]
2452    async fn load_resource_blob_requires_visible_resource_metadata() {
2453        struct TestPolicy;
2454
2455        #[async_trait::async_trait]
2456        impl NamespacePolicy for TestPolicy {
2457            async fn allowed_namespaces(&self, principal: &str) -> Option<Vec<String>> {
2458                match principal {
2459                    "allowed" => Some(vec!["default".to_string()]),
2460                    "blocked" => Some(vec!["blocked".to_string()]),
2461                    _ => Some(Vec::new()),
2462                }
2463            }
2464        }
2465
2466        let store = PolicyEnforcedStore::new(MemoryStore::new(), Arc::new(TestPolicy));
2467        let blob = vec![4_u8; 64];
2468        let resource = ResourceObject::builder()
2469            .modality(ModalityProfile::Image)
2470            .location(ResourceLocation::Blob { blob_index: 0 })
2471            .build()
2472            .unwrap();
2473        let resource = CURRENT_PRINCIPAL
2474            .scope("allowed".to_string(), async {
2475                persist_resource(&store, resource, Some(blob.clone())).await
2476            })
2477            .await
2478            .unwrap();
2479
2480        let visible = CURRENT_PRINCIPAL
2481            .scope("allowed".to_string(), async {
2482                load_resource_blob(&store, resource.id, 0).await
2483            })
2484            .await
2485            .unwrap();
2486        assert_eq!(visible, blob);
2487
2488        let denied = CURRENT_PRINCIPAL
2489            .scope("blocked".to_string(), async {
2490                load_resource_blob(&store, resource.id, 0).await
2491            })
2492            .await
2493            .unwrap_err();
2494        assert!(matches!(denied, HirnDbError::BlobError { .. }));
2495    }
2496
2497    #[tokio::test(flavor = "multi_thread")]
2498    async fn redact_resource_blocks_payload_hydration_and_keeps_placeholder_head() {
2499        let store = MemoryStore::new();
2500        let blob = vec![6_u8; 96];
2501        let resource = ResourceObject::builder()
2502            .modality(ModalityProfile::Document)
2503            .mime_type("application/pdf")
2504            .display_name("roadmap.pdf")
2505            .location(ResourceLocation::Blob { blob_index: 0 })
2506            .build()
2507            .unwrap();
2508        let resource = persist_resource(&store, resource, Some(blob.clone()))
2509            .await
2510            .unwrap();
2511
2512        let artifact = DerivedArtifact::builder()
2513            .resource_id(resource.id)
2514            .kind(DerivedArtifactKind::Preview)
2515            .modality(ModalityProfile::Text)
2516            .text_content("preview text")
2517            .build()
2518            .unwrap();
2519        persist_derived_artifact(&store, artifact).await.unwrap();
2520
2521        let redacted = redact_resource(
2522            &store,
2523            resource.id,
2524            ResourceGovernanceUpdate {
2525                reason: Some("contains sensitive evidence".into()),
2526                placeholder_display_name: Some("redacted evidence".into()),
2527            },
2528        )
2529        .await
2530        .unwrap();
2531
2532        assert_eq!(redacted.governance_state, ResourceGovernanceState::Redacted);
2533        assert_eq!(redacted.display_name.as_deref(), Some("redacted evidence"));
2534
2535        let historical = get_resource(&store, resource.id).await.unwrap().unwrap();
2536        assert_eq!(
2537            historical.governance_state,
2538            ResourceGovernanceState::Redacted
2539        );
2540        assert_eq!(
2541            historical.display_name.as_deref(),
2542            Some("redacted evidence")
2543        );
2544        assert!(historical.mime_type.is_none());
2545        assert_eq!(historical.size_bytes, 0);
2546
2547        let preview = fetch_resource(&store, resource.id, HydrationMode::Preview)
2548            .await
2549            .unwrap()
2550            .unwrap();
2551        assert!(preview.artifacts.is_empty());
2552        assert!(preview.blob.is_none());
2553
2554        let full = fetch_resource(&store, redacted.id, HydrationMode::Full)
2555            .await
2556            .unwrap()
2557            .unwrap();
2558        assert!(full.artifacts.is_empty());
2559        assert!(full.blob.is_none());
2560
2561        let blob_err = load_resource_blob(&store, resource.id, 0)
2562            .await
2563            .unwrap_err();
2564        assert!(matches!(blob_err, HirnDbError::BlobError { .. }));
2565
2566        let remaining_blobs = store
2567            .scan(blob_ds::DATASET_NAME, ScanOptions::default())
2568            .await
2569            .unwrap();
2570        let remaining_artifacts = store
2571            .scan(artifact_ds::DATASET_NAME, ScanOptions::default())
2572            .await
2573            .unwrap();
2574        assert_eq!(
2575            remaining_blobs
2576                .iter()
2577                .map(|batch| batch.num_rows())
2578                .sum::<usize>(),
2579            0
2580        );
2581        assert_eq!(
2582            remaining_artifacts
2583                .iter()
2584                .map(|batch| batch.num_rows())
2585                .sum::<usize>(),
2586            0
2587        );
2588    }
2589
2590    #[tokio::test(flavor = "multi_thread")]
2591    async fn purge_resource_marks_lineage_as_purged() {
2592        let store = MemoryStore::new();
2593        let blob = vec![2_u8; 40];
2594        let resource = ResourceObject::builder()
2595            .modality(ModalityProfile::Image)
2596            .display_name("frame.png")
2597            .location(ResourceLocation::Blob { blob_index: 0 })
2598            .build()
2599            .unwrap();
2600        let resource = persist_resource(&store, resource, Some(blob))
2601            .await
2602            .unwrap();
2603
2604        let purged = purge_resource(&store, resource.id, ResourceGovernanceUpdate::default())
2605            .await
2606            .unwrap();
2607        assert_eq!(purged.governance_state, ResourceGovernanceState::Purged);
2608        assert_eq!(purged.display_name.as_deref(), Some("purged resource"));
2609
2610        let head = get_resource_head(&store, resource.id)
2611            .await
2612            .unwrap()
2613            .unwrap();
2614        assert_eq!(head.id, purged.id);
2615        assert_eq!(head.governance_state, ResourceGovernanceState::Purged);
2616
2617        let historical = get_resource(&store, resource.id).await.unwrap().unwrap();
2618        assert_eq!(historical.governance_state, ResourceGovernanceState::Purged);
2619        assert!(historical.mime_type.is_none());
2620        assert_eq!(historical.size_bytes, 0);
2621
2622        let revisions = list_resource_revisions(&store, resource.id).await.unwrap();
2623        assert_eq!(revisions.len(), 2);
2624        assert_eq!(revisions[0].id, resource.id);
2625        assert_eq!(revisions[1].id, purged.id);
2626    }
2627
2628    #[tokio::test(flavor = "multi_thread")]
2629    async fn retention_policy_targets_modality_and_classification() {
2630        let store = MemoryStore::new();
2631
2632        let image_restricted = persist_resource(
2633            &store,
2634            ResourceObject::builder()
2635                .modality(ModalityProfile::Image)
2636                .metadata_entry("classification", "restricted")
2637                .location(ResourceLocation::Blob { blob_index: 0 })
2638                .build()
2639                .unwrap(),
2640            Some(vec![1_u8; 24]),
2641        )
2642        .await
2643        .unwrap();
2644        let image_public = persist_resource(
2645            &store,
2646            ResourceObject::builder()
2647                .modality(ModalityProfile::Image)
2648                .metadata_entry("classification", "public")
2649                .location(ResourceLocation::Blob { blob_index: 0 })
2650                .build()
2651                .unwrap(),
2652            Some(vec![2_u8; 24]),
2653        )
2654        .await
2655        .unwrap();
2656        let document_restricted = persist_resource(
2657            &store,
2658            ResourceObject::builder()
2659                .modality(ModalityProfile::Document)
2660                .metadata_entry("classification", "restricted")
2661                .location(ResourceLocation::Blob { blob_index: 0 })
2662                .build()
2663                .unwrap(),
2664            Some(vec![3_u8; 24]),
2665        )
2666        .await
2667        .unwrap();
2668        let document_public = persist_resource(
2669            &store,
2670            ResourceObject::builder()
2671                .modality(ModalityProfile::Document)
2672                .metadata_entry("classification", "public")
2673                .location(ResourceLocation::Blob { blob_index: 0 })
2674                .build()
2675                .unwrap(),
2676            Some(vec![4_u8; 24]),
2677        )
2678        .await
2679        .unwrap();
2680
2681        let policy = ResourceRetentionPolicy::default()
2682            .with_rule(
2683                ResourceRetentionRule::new(ResourceRetentionAction::Redact)
2684                    .classification("restricted"),
2685            )
2686            .with_rule(
2687                ResourceRetentionRule::new(ResourceRetentionAction::Purge)
2688                    .modality(ModalityProfile::Image),
2689            );
2690
2691        let result = apply_resource_retention_policy(&store, &policy)
2692            .await
2693            .unwrap();
2694        assert_eq!(result.scanned_active_heads, 4);
2695        assert_eq!(result.governed_resources, 3);
2696        assert_eq!(result.redacted_resources, 1);
2697        assert_eq!(result.purged_resources, 2);
2698        assert_eq!(result.skipped_resources, 0);
2699
2700        let image_restricted = get_resource_head(&store, image_restricted.id)
2701            .await
2702            .unwrap()
2703            .unwrap();
2704        assert_eq!(
2705            image_restricted.governance_state,
2706            ResourceGovernanceState::Purged
2707        );
2708
2709        let image_public = get_resource_head(&store, image_public.id)
2710            .await
2711            .unwrap()
2712            .unwrap();
2713        assert_eq!(
2714            image_public.governance_state,
2715            ResourceGovernanceState::Purged
2716        );
2717
2718        let document_restricted = get_resource_head(&store, document_restricted.id)
2719            .await
2720            .unwrap()
2721            .unwrap();
2722        assert_eq!(
2723            document_restricted.governance_state,
2724            ResourceGovernanceState::Redacted
2725        );
2726
2727        let document_public = get_resource_head(&store, document_public.id)
2728            .await
2729            .unwrap()
2730            .unwrap();
2731        assert_eq!(
2732            document_public.governance_state,
2733            ResourceGovernanceState::Active
2734        );
2735    }
2736
2737    #[tokio::test(flavor = "multi_thread")]
2738    async fn persist_resource_with_quota_policy_blocks_namespace_limit() {
2739        let store = MemoryStore::new();
2740        let namespace = Namespace::new("quota-ns").unwrap();
2741        let policy = ResourceQuotaPolicy::default().with_rule(
2742            ResourceQuotaRule::new(ResourceQuotaScope::Namespace(namespace))
2743                .max_active_resources(1),
2744        );
2745
2746        let first = ResourceObject::builder()
2747            .modality(ModalityProfile::Document)
2748            .location(ResourceLocation::Blob { blob_index: 0 })
2749            .namespace(namespace)
2750            .build()
2751            .unwrap();
2752        persist_resource_with_quota_policy(&store, first, Some(vec![1_u8; 16]), &policy)
2753            .await
2754            .unwrap();
2755
2756        let second = ResourceObject::builder()
2757            .modality(ModalityProfile::Document)
2758            .location(ResourceLocation::Blob { blob_index: 0 })
2759            .namespace(namespace)
2760            .build()
2761            .unwrap();
2762        let error =
2763            persist_resource_with_quota_policy(&store, second, Some(vec![2_u8; 16]), &policy)
2764                .await
2765                .unwrap_err();
2766
2767        assert!(
2768            matches!(error, HirnDbError::LimitExceeded(message) if message.contains("namespace `quota-ns`") && message.contains("active resources"))
2769        );
2770    }
2771
2772    #[tokio::test(flavor = "multi_thread")]
2773    async fn supersede_resource_with_quota_policy_reuses_the_active_head_slot() {
2774        let store = MemoryStore::new();
2775        let namespace = Namespace::new("quota-replace").unwrap();
2776        let policy = ResourceQuotaPolicy::default().with_rule(
2777            ResourceQuotaRule::new(ResourceQuotaScope::Namespace(namespace))
2778                .max_active_resources(1),
2779        );
2780
2781        let original = ResourceObject::builder()
2782            .modality(ModalityProfile::Document)
2783            .location(ResourceLocation::Blob { blob_index: 0 })
2784            .namespace(namespace)
2785            .build()
2786            .unwrap();
2787        let original =
2788            persist_resource_with_quota_policy(&store, original, Some(vec![1_u8; 16]), &policy)
2789                .await
2790                .unwrap();
2791
2792        let successor = supersede_resource_with_quota_policy(
2793            &store,
2794            original.id,
2795            ResourceSupersession {
2796                display_name: Some("replacement.pdf".into()),
2797                ..Default::default()
2798            },
2799            Some(vec![2_u8; 24]),
2800            &policy,
2801        )
2802        .await
2803        .unwrap();
2804
2805        let head = get_resource_head(&store, original.id)
2806            .await
2807            .unwrap()
2808            .unwrap();
2809        assert_eq!(head.id, successor.id);
2810        assert_eq!(head.display_name.as_deref(), Some("replacement.pdf"));
2811        let revisions = list_resource_revisions(&store, original.id).await.unwrap();
2812        assert_eq!(revisions.len(), 2);
2813    }
2814}