use std::time::{Instant, SystemTime};
use smallvec::SmallVec;
use thiserror::Error;
use crate::audit::{UserAuditEvent, UserAuditSink};
use crate::encryption::{
resolve_rotation_generation, AtRestHooks, CodecError, CodecErrorClass, CodecId, DecodeContext,
EncodeContext, EncodedRecord, RotationContext,
};
use crate::identity::TraceId;
use crate::proto::{AtUri, Did, Nsid, RecordKey};
use crate::read_pipeline::ReadAuthorization;
use crate::target::TargetRepresentation;
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct RecordContentContext {
pub nsid: Nsid,
pub rkey: RecordKey,
pub originator: Did,
pub audience_list: Option<AtUri>,
pub requester: Did,
pub subject_repr: TargetRepresentation,
pub trace_id: TraceId,
pub operator_context: SmallVec<[(String, Vec<u8>); 2]>,
}
impl RecordContentContext {
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn new(
nsid: Nsid,
rkey: RecordKey,
originator: Did,
audience_list: Option<AtUri>,
requester: Did,
subject_repr: TargetRepresentation,
trace_id: TraceId,
operator_context: SmallVec<[(String, Vec<u8>); 2]>,
) -> Self {
RecordContentContext {
nsid,
rkey,
originator,
audience_list,
requester,
subject_repr,
trace_id,
operator_context,
}
}
}
pub async fn encode_record_content(
hooks: &dyn AtRestHooks,
user_sink: &dyn UserAuditSink,
plaintext: &[u8],
ctx: &RecordContentContext,
deadline: Instant,
now: SystemTime,
) -> Result<EncodedRecord, CodecError> {
let codec = hooks.content_codec();
let codec_id = codec.codec_id();
let rotation_ctx = RotationContext {
originator: ctx.originator.clone(),
nsid: ctx.nsid.clone(),
audience_list: ctx.audience_list.clone(),
};
let oracle = hooks.rotation_oracle();
let generation =
match resolve_rotation_generation(oracle.as_deref(), &codec_id, &rotation_ctx, now) {
Ok(g) => g,
Err(e) => {
emit_encode_failed(user_sink, ctx, codec_id, e.class(), now);
return Err(e);
}
};
let encode_ctx = EncodeContext {
nsid: ctx.nsid.clone(),
rkey: ctx.rkey.clone(),
originator: ctx.originator.clone(),
audience_list: ctx.audience_list.clone(),
current_generation_hint: generation.clone(),
trace_id: ctx.trace_id,
operator_context: ctx.operator_context.clone(),
};
let content = match codec.encode(plaintext, &encode_ctx, deadline).await {
Ok(c) => c,
Err(e) => {
emit_encode_failed(user_sink, ctx, codec_id, e.class(), now);
return Err(e);
}
};
let record = EncodedRecord {
codec: codec_id.clone(),
content,
generation: generation.clone(),
};
let _ = user_sink.record(UserAuditEvent::ContentEncoded {
trace_id: ctx.trace_id,
requester: ctx.requester.clone(),
subject_repr: ctx.subject_repr.clone(),
codec: codec_id,
generation,
at: now,
});
Ok(record)
}
pub async fn decode_record_content(
authz: &ReadAuthorization,
hooks: &dyn AtRestHooks,
user_sink: &dyn UserAuditSink,
encoded: &EncodedRecord,
ctx: &RecordContentContext,
deadline: Instant,
now: SystemTime,
) -> Result<Vec<u8>, CodecError> {
let codec = hooks.content_codec();
let installed = codec.codec_id();
if encoded.codec != installed {
let err = CodecError::UnknownOrWrongCodec {
stored: encoded.codec.clone(),
installed: installed.clone(),
};
emit_decode_failed(
user_sink,
ctx,
authz.reader(),
Some(installed),
Some(encoded.codec.clone()),
err.class(),
now,
);
return Err(err);
}
let decode_ctx = DecodeContext {
nsid: ctx.nsid.clone(),
rkey: ctx.rkey.clone(),
originator: ctx.originator.clone(),
audience_list: ctx.audience_list.clone(),
trace_id: ctx.trace_id,
operator_context: ctx.operator_context.clone(),
};
match codec.decode(encoded, &decode_ctx, deadline).await {
Ok(plaintext) => Ok(plaintext),
Err(e) => {
emit_decode_failed(user_sink, ctx, authz.reader(), Some(installed), None, e.class(), now);
Err(e)
}
}
}
fn emit_encode_failed(
user_sink: &dyn UserAuditSink,
ctx: &RecordContentContext,
codec: CodecId,
error_class: CodecErrorClass,
at: SystemTime,
) {
let _ = user_sink.record(UserAuditEvent::ContentEncodeFailed {
trace_id: ctx.trace_id,
requester: ctx.requester.clone(),
subject_repr: ctx.subject_repr.clone(),
codec,
error_class,
at,
});
}
fn emit_decode_failed(
user_sink: &dyn UserAuditSink,
ctx: &RecordContentContext,
reader: &Did,
codec: Option<CodecId>,
stored_codec: Option<CodecId>,
error_class: CodecErrorClass,
at: SystemTime,
) {
let _ = user_sink.record(UserAuditEvent::ContentDecodeFailed {
trace_id: ctx.trace_id,
requester: reader.clone(),
subject_repr: ctx.subject_repr.clone(),
codec,
stored_codec,
error_class,
at,
});
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum AtRestInstallError {
#[error("codec {codec} requires a RotationOracle, but none is installed")]
CodecRequiresRotation {
codec: CodecId,
},
#[error("codec {codec} requires rotation, but the installed oracle yields no generation")]
OracleYieldsNoGeneration {
codec: CodecId,
},
}
pub fn validate_at_rest_install(hooks: &dyn AtRestHooks) -> Result<(), AtRestInstallError> {
let codec = hooks.content_codec();
if codec.requires_rotation() {
match hooks.rotation_oracle() {
None => {
return Err(AtRestInstallError::CodecRequiresRotation {
codec: codec.codec_id(),
});
}
Some(oracle) => {
if oracle
.current_generation(&RotationContext::for_install_probe())
.is_none()
{
return Err(AtRestInstallError::OracleYieldsNoGeneration {
codec: codec.codec_id(),
});
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use super::*;
use crate::audit::AuditError;
use crate::encryption::{
AuditEncryptionResolver, ContentCodec, RotationGenerationMark, RotationOracle,
};
use crate::{StructuralRepresentation, TargetRepresentation};
struct StubCodec {
id: CodecId,
encode: Result<Vec<u8>, CodecError>,
decode: Result<Vec<u8>, CodecError>,
requires_rotation: bool,
}
#[async_trait]
impl ContentCodec for StubCodec {
fn codec_id(&self) -> CodecId {
self.id.clone()
}
fn requires_rotation(&self) -> bool {
self.requires_rotation
}
async fn encode(
&self,
_plaintext: &[u8],
_context: &EncodeContext,
_deadline: Instant,
) -> Result<Vec<u8>, CodecError> {
self.encode.clone()
}
async fn decode(
&self,
_encoded: &EncodedRecord,
_context: &DecodeContext,
_deadline: Instant,
) -> Result<Vec<u8>, CodecError> {
self.decode.clone()
}
}
struct LegacyTextCodec;
#[async_trait]
impl ContentCodec for LegacyTextCodec {
fn codec_id(&self) -> CodecId {
CodecId::new("legacy-text/0.0").unwrap()
}
async fn encode(
&self,
_plaintext: &[u8],
_context: &EncodeContext,
_deadline: Instant,
) -> Result<Vec<u8>, CodecError> {
panic!("LegacyTextCodec is decode-only (legacy text read path); encode must not be called")
}
async fn decode(
&self,
encoded: &EncodedRecord,
_context: &DecodeContext,
_deadline: Instant,
) -> Result<Vec<u8>, CodecError> {
Ok(encoded.content.clone())
}
}
struct StubOracle {
generation: Option<RotationGenerationMark>,
synced: SystemTime,
bound: Duration,
}
impl RotationOracle for StubOracle {
fn current_generation(&self, _ctx: &RotationContext) -> Option<RotationGenerationMark> {
self.generation.clone()
}
fn last_synced_at(&self) -> SystemTime {
self.synced
}
fn data_freshness_bound(&self) -> Duration {
self.bound
}
}
struct StubHooks {
codec: Arc<dyn ContentCodec>,
oracle: Option<Arc<dyn RotationOracle>>,
}
impl AtRestHooks for StubHooks {
fn audit(&self) -> Option<Arc<dyn AuditEncryptionResolver>> {
None
}
fn content_codec(&self) -> Arc<dyn ContentCodec> {
self.codec.clone()
}
fn rotation_oracle(&self) -> Option<Arc<dyn RotationOracle>> {
self.oracle.clone()
}
}
#[derive(Default)]
struct CapturingSink {
events: Mutex<Vec<UserAuditEvent>>,
}
impl UserAuditSink for CapturingSink {
fn record(&self, event: UserAuditEvent) -> Result<(), AuditError> {
self.events.lock().unwrap().push(event);
Ok(())
}
}
fn codec_id() -> CodecId {
CodecId::new("laquna/0.2").unwrap()
}
fn other_codec_id() -> CodecId {
CodecId::new("other/1.0").unwrap()
}
fn ctx() -> RecordContentContext {
let did = Did::new("did:plc:exampleexampleexample").unwrap();
RecordContentContext::new(
Nsid::new("tools.kryphocron.feed.postPrivate").unwrap(),
RecordKey::new("3kabcdefghij2").unwrap(),
did.clone(),
None,
did.clone(),
TargetRepresentation::structural_only(StructuralRepresentation::Resource {
did,
nsid: Nsid::new("tools.kryphocron.feed.postPrivate").unwrap(),
}),
TraceId::from_bytes([7; 16]),
SmallVec::new(),
)
}
fn deadline() -> Instant {
Instant::now() + Duration::from_secs(30)
}
fn authz() -> ReadAuthorization {
ReadAuthorization::new_for_test(Did::new("did:plc:exampleexampleexample").unwrap())
}
fn fresh_oracle(mark: &str) -> Arc<dyn RotationOracle> {
Arc::new(StubOracle {
generation: Some(RotationGenerationMark::new(mark).unwrap()),
synced: SystemTime::now(),
bound: Duration::from_secs(3600),
})
}
fn hooks_with(
encode: Result<Vec<u8>, CodecError>,
decode: Result<Vec<u8>, CodecError>,
oracle: Option<Arc<dyn RotationOracle>>,
) -> StubHooks {
StubHooks {
codec: Arc::new(StubCodec {
id: codec_id(),
encode,
decode,
requires_rotation: false,
}),
oracle,
}
}
#[tokio::test]
async fn encode_success_stamps_metadata_and_emits() {
let hooks = hooks_with(Ok(b"CIPHER".to_vec()), Ok(vec![]), Some(fresh_oracle("000042")));
let sink = CapturingSink::default();
let now = SystemTime::now();
let rec = encode_record_content(&hooks, &sink, b"hi", &ctx(), deadline(), now)
.await
.unwrap();
assert_eq!(rec.codec, codec_id());
assert_eq!(rec.content, b"CIPHER");
assert_eq!(rec.generation.as_ref().unwrap().as_str(), "000042");
let events = sink.events.lock().unwrap();
assert!(matches!(
events.as_slice(),
[UserAuditEvent::ContentEncoded { .. }]
));
}
#[tokio::test]
async fn encode_stale_rotation_fails_and_emits() {
let stale = Arc::new(StubOracle {
generation: Some(RotationGenerationMark::new("000042").unwrap()),
synced: SystemTime::now() - Duration::from_secs(7200),
bound: Duration::from_secs(3600),
});
let hooks = hooks_with(Ok(b"x".to_vec()), Ok(vec![]), Some(stale));
let sink = CapturingSink::default();
let err = encode_record_content(&hooks, &sink, b"hi", &ctx(), deadline(), SystemTime::now())
.await
.unwrap_err();
assert_eq!(err.class(), CodecErrorClass::RotationStateUnavailable);
let events = sink.events.lock().unwrap();
assert!(matches!(
events.as_slice(),
[UserAuditEvent::ContentEncodeFailed {
error_class: CodecErrorClass::RotationStateUnavailable,
..
}]
));
}
#[tokio::test]
async fn encode_codec_error_fails_and_emits() {
let hooks = hooks_with(
Err(CodecError::BackendUnavailable {
detail: "down".into(),
}),
Ok(vec![]),
Some(fresh_oracle("000042")),
);
let sink = CapturingSink::default();
let err = encode_record_content(&hooks, &sink, b"hi", &ctx(), deadline(), SystemTime::now())
.await
.unwrap_err();
assert_eq!(err.class(), CodecErrorClass::BackendUnavailable);
assert!(matches!(
sink.events.lock().unwrap().as_slice(),
[UserAuditEvent::ContentEncodeFailed { .. }]
));
}
fn encoded_under(codec: CodecId) -> EncodedRecord {
EncodedRecord {
codec,
content: b"CIPHER".to_vec(),
generation: None,
}
}
#[tokio::test]
async fn decode_success() {
let hooks = hooks_with(Ok(vec![]), Ok(b"PLAIN".to_vec()), None);
let sink = CapturingSink::default();
let out = decode_record_content(
&authz(),
&hooks,
&sink,
&encoded_under(codec_id()),
&ctx(),
deadline(),
SystemTime::now(),
)
.await
.unwrap();
assert_eq!(out, b"PLAIN");
assert!(sink.events.lock().unwrap().is_empty());
}
#[tokio::test]
async fn decode_legacy_text_record_returns_text_unchanged() {
let hooks = StubHooks {
codec: Arc::new(LegacyTextCodec),
oracle: None,
};
let sink = CapturingSink::default();
let record = EncodedRecord {
codec: CodecId::new("legacy-text/0.0").unwrap(),
content: b"a legacy plaintext post".to_vec(),
generation: None,
};
let out = decode_record_content(
&authz(),
&hooks,
&sink,
&record,
&ctx(),
deadline(),
SystemTime::now(),
)
.await
.unwrap();
assert_eq!(out, b"a legacy plaintext post");
assert!(sink.events.lock().unwrap().is_empty());
}
#[tokio::test]
async fn decode_codec_mismatch_fails_and_emits() {
let hooks = hooks_with(Ok(vec![]), Ok(b"PLAIN".to_vec()), None);
let sink = CapturingSink::default();
let err = decode_record_content(
&authz(),
&hooks,
&sink,
&encoded_under(other_codec_id()),
&ctx(),
deadline(),
SystemTime::now(),
)
.await
.unwrap_err();
assert!(matches!(err, CodecError::UnknownOrWrongCodec { .. }));
let events = sink.events.lock().unwrap();
assert!(matches!(
events.as_slice(),
[UserAuditEvent::ContentDecodeFailed {
codec: Some(_),
stored_codec: Some(_),
error_class: CodecErrorClass::UnknownOrWrongCodec,
..
}]
));
}
#[tokio::test]
async fn decode_codec_error_fails_and_emits_with_no_stored_codec() {
let hooks = hooks_with(Ok(vec![]), Err(CodecError::Malformed { codec: codec_id() }), None);
let sink = CapturingSink::default();
let err = decode_record_content(
&authz(),
&hooks,
&sink,
&encoded_under(codec_id()),
&ctx(),
deadline(),
SystemTime::now(),
)
.await
.unwrap_err();
assert!(matches!(err, CodecError::Malformed { .. }));
assert!(matches!(
sink.events.lock().unwrap().as_slice(),
[UserAuditEvent::ContentDecodeFailed {
stored_codec: None,
error_class: CodecErrorClass::Malformed,
..
}]
));
}
fn codec_requiring_rotation() -> Arc<dyn ContentCodec> {
Arc::new(StubCodec {
id: codec_id(),
encode: Ok(vec![]),
decode: Ok(vec![]),
requires_rotation: true,
})
}
#[test]
fn install_codec_requires_rotation_without_oracle_fails_closed() {
let hooks = StubHooks {
codec: codec_requiring_rotation(),
oracle: None,
};
assert!(matches!(
validate_at_rest_install(&hooks),
Err(AtRestInstallError::CodecRequiresRotation { .. })
));
}
#[test]
fn install_codec_requires_rotation_with_oracle_ok() {
let hooks = StubHooks {
codec: codec_requiring_rotation(),
oracle: Some(fresh_oracle("000042")),
};
assert!(validate_at_rest_install(&hooks).is_ok());
}
#[test]
fn install_codec_not_requiring_rotation_without_oracle_ok() {
let no_req = hooks_with(Ok(vec![]), Ok(vec![]), None);
assert!(validate_at_rest_install(&no_req).is_ok());
}
fn fixup_tmp_dir(tag: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!("kryphocron-fixup-{}-{}", std::process::id(), tag))
}
#[test]
fn validate_install_rejects_norotation_oracle_with_rotating_codec() {
use crate::encryption::{DefaultAtRestHooks, NoRotationOracle};
let dir = fixup_tmp_dir("norotation");
let hooks = DefaultAtRestHooks::builder(dir.clone())
.with_rotation_oracle(Arc::new(NoRotationOracle))
.build()
.expect("build (no fallible inner construction)");
match validate_at_rest_install(&hooks) {
Err(AtRestInstallError::OracleYieldsNoGeneration { codec }) => {
assert_eq!(codec.as_str(), "laquna/0.2");
}
other => panic!("expected OracleYieldsNoGeneration, got {other:?}"),
}
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn validate_install_passes_for_healthy_default_oracle() {
use crate::encryption::DefaultAtRestHooks;
let dir = fixup_tmp_dir("healthy");
let hooks = DefaultAtRestHooks::for_data_dir(dir.clone()).expect("build");
assert!(validate_at_rest_install(&hooks).is_ok());
let _ = std::fs::remove_dir_all(&dir);
}
#[tokio::test]
async fn default_hooks_encode_then_decode_real_record() {
use crate::encryption::DefaultAtRestHooks;
let dir = fixup_tmp_dir("e2e");
let hooks = DefaultAtRestHooks::for_data_dir(dir.clone()).expect("install hooks");
validate_at_rest_install(&hooks).expect("install validates");
let sink = CapturingSink::default();
let now = SystemTime::now();
let plaintext = b"this is the constitutional-claim plaintext sentinel";
let encoded = encode_record_content(&hooks, &sink, plaintext, &ctx(), deadline(), now)
.await
.expect("encode succeeds");
assert!(
!encoded
.content
.windows(plaintext.len())
.any(|w| w == plaintext),
"encoded bytes contain the plaintext sentinel — encoding-at-default failed"
);
assert_eq!(encoded.codec.as_str(), "laquna/0.2");
let decoded =
decode_record_content(&authz(), &hooks, &sink, &encoded, &ctx(), deadline(), now)
.await
.expect("decode succeeds");
assert_eq!(decoded.as_slice(), plaintext.as_slice());
let _ = std::fs::remove_dir_all(&dir);
}
}