use std::io;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use crate::encryption::{
CodecError, CodecId, ContentCodec, DecodeContext, EncodeContext, EncodedRecord,
RotationContext, RotationGenerationMark, RotationOracle,
};
use crate::proto::{Did, Nsid, RecordKey};
const DEFAULT_CADENCE: Duration = Duration::from_secs(86_400);
mod internal;
pub use internal::DecodeError;
#[derive(Clone)]
pub struct Codec {
seed_policy: SeedPolicy,
}
#[derive(Clone)]
pub enum SeedPolicy {
DidNsidRkey,
#[allow(clippy::type_complexity)]
Custom(Arc<dyn Fn(&EncodeContext) -> Vec<u8> + Send + Sync>),
}
impl Default for Codec {
fn default() -> Self {
Self {
seed_policy: SeedPolicy::DidNsidRkey,
}
}
}
impl Codec {
#[must_use]
pub fn new(seed_policy: SeedPolicy) -> Self {
Self { seed_policy }
}
}
impl std::fmt::Debug for Codec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Codec")
.field("seed_policy", &self.seed_policy)
.finish()
}
}
impl std::fmt::Debug for SeedPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SeedPolicy::DidNsidRkey => f.write_str("DidNsidRkey"),
SeedPolicy::Custom(_) => f.write_str("Custom(<closure>)"),
}
}
}
fn derive_seed_did_nsid_rkey(originator: &Did, nsid: &Nsid, rkey: &RecordKey) -> Vec<u8> {
format!("{}||{}||{}", originator.as_str(), nsid.as_str(), rkey.as_str()).into_bytes()
}
fn decode_ctx_as_encode_ctx(ctx: &DecodeContext) -> EncodeContext {
EncodeContext {
nsid: ctx.nsid.clone(),
rkey: ctx.rkey.clone(),
originator: ctx.originator.clone(),
audience_list: ctx.audience_list.clone(),
current_generation_hint: None,
trace_id: ctx.trace_id,
operator_context: ctx.operator_context.clone(),
}
}
fn parse_slug_from_mark(mark: &RotationGenerationMark) -> Option<[u8; 32]> {
let s: &str = mark.as_str();
let mut parts = s.splitn(3, '/');
let prefix = parts.next()?;
let _unix_secs = parts.next()?; let hex_slug = parts.next()?;
if prefix != "laquna" || hex_slug.len() != 64 {
return None;
}
let mut slug = [0u8; 32];
hex::decode_to_slice(hex_slug, &mut slug).ok()?;
Some(slug)
}
#[async_trait]
impl ContentCodec for Codec {
fn codec_id(&self) -> CodecId {
CodecId::new("laquna/0.2").expect("\"laquna/0.2\" is a valid codec id")
}
fn requires_rotation(&self) -> bool {
true
}
async fn encode(
&self,
plaintext: &[u8],
context: &EncodeContext,
_deadline: Instant,
) -> Result<Vec<u8>, CodecError> {
let seed = match &self.seed_policy {
SeedPolicy::DidNsidRkey => {
derive_seed_did_nsid_rkey(&context.originator, &context.nsid, &context.rkey)
}
SeedPolicy::Custom(f) => f(context),
};
debug_assert!(
!seed.is_empty(),
"substrate invariant: seed must be non-empty"
);
let mark = context.current_generation_hint.as_ref().ok_or_else(|| {
CodecError::RotationStateUnavailable {
codec: self.codec_id(),
}
})?;
let slug = parse_slug_from_mark(mark).expect(
"substrate invariant: rotation mark must match the default \
oracle's format (§4.7)",
);
Ok(internal::encode(plaintext, &seed, &slug))
}
async fn decode(
&self,
encoded: &EncodedRecord,
context: &DecodeContext,
_deadline: Instant,
) -> Result<Vec<u8>, CodecError> {
let seed = match &self.seed_policy {
SeedPolicy::DidNsidRkey => {
derive_seed_did_nsid_rkey(&context.originator, &context.nsid, &context.rkey)
}
SeedPolicy::Custom(f) => f(&decode_ctx_as_encode_ctx(context)),
};
debug_assert!(
!seed.is_empty(),
"substrate invariant: seed must be non-empty"
);
internal::decode(&encoded.content, &seed)
.map_err(|_e| CodecError::Malformed {
codec: self.codec_id(),
})
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum RotationOracleConstructionError {
#[error("CSRNG failed during initial rotation-slug generation: {0}")]
CsrngFailed(getrandom::Error),
#[error("install-time rotation-state write failed at {path}: {source}")]
InitialPersistenceFailed {
path: PathBuf,
source: io::Error,
},
}
struct RotationState {
current_slug: [u8; 32],
generated_at: SystemTime,
}
struct PersistRequest {
slug: [u8; 32],
generated_at: SystemTime,
}
pub struct DefaultRotationOracle {
state: Arc<RwLock<RotationState>>,
cadence: Duration,
persist_tx: mpsc::Sender<PersistRequest>,
}
impl DefaultRotationOracle {
pub fn for_data_dir(data_dir: PathBuf) -> Result<Self, RotationOracleConstructionError> {
Self::construct(default_state_path(&data_dir), DEFAULT_CADENCE)
}
#[must_use]
pub fn builder() -> DefaultRotationOracleBuilder {
DefaultRotationOracleBuilder {
cadence: DEFAULT_CADENCE,
persistence_path: None,
}
}
fn construct(
persistence_path: PathBuf,
cadence: Duration,
) -> Result<Self, RotationOracleConstructionError> {
let now = SystemTime::now();
let state = match read_state_file(&persistence_path) {
Some(loaded)
if now
.duration_since(loaded.generated_at)
.map(|age| age < cadence)
.unwrap_or(false) =>
{
loaded
}
_ => RotationState {
current_slug: generate_slug()
.map_err(RotationOracleConstructionError::CsrngFailed)?,
generated_at: now,
},
};
write_state_file(&persistence_path, &state).map_err(|source| {
RotationOracleConstructionError::InitialPersistenceFailed {
path: persistence_path.clone(),
source,
}
})?;
let (persist_tx, persist_rx) = mpsc::channel::<PersistRequest>();
let worker_path = persistence_path;
std::thread::spawn(move || {
while let Ok(req) = persist_rx.recv() {
let snapshot = RotationState {
current_slug: req.slug,
generated_at: req.generated_at,
};
let _ = write_state_file(&worker_path, &snapshot);
}
});
Ok(Self {
state: Arc::new(RwLock::new(state)),
cadence,
persist_tx,
})
}
}
pub struct DefaultRotationOracleBuilder {
cadence: Duration,
persistence_path: Option<PathBuf>,
}
impl DefaultRotationOracleBuilder {
#[must_use]
pub fn cadence(mut self, cadence: Duration) -> Self {
self.cadence = cadence;
self
}
#[must_use]
pub fn persistence_path(mut self, path: PathBuf) -> Self {
self.persistence_path = Some(path);
self
}
pub fn build(self) -> Result<DefaultRotationOracle, RotationOracleConstructionError> {
let path = self.persistence_path.ok_or_else(|| {
RotationOracleConstructionError::InitialPersistenceFailed {
path: PathBuf::new(),
source: io::Error::new(
io::ErrorKind::InvalidInput,
"persistence_path must be set on DefaultRotationOracle::builder()",
),
}
})?;
DefaultRotationOracle::construct(path, self.cadence)
}
}
impl RotationOracle for DefaultRotationOracle {
fn current_generation(&self, _ctx: &RotationContext) -> Option<RotationGenerationMark> {
let now = SystemTime::now();
{
let st = self.state.read().expect("rotation state lock not poisoned");
let fresh = now
.duration_since(st.generated_at)
.map(|age| age < self.cadence)
.unwrap_or(false);
if fresh {
return Some(format_mark(st.generated_at, &st.current_slug));
}
}
let mut st = self.state.write().expect("rotation state lock not poisoned");
let still_stale = now
.duration_since(st.generated_at)
.map(|age| age >= self.cadence)
.unwrap_or(true);
if still_stale {
match generate_slug() {
Ok(slug) => {
st.current_slug = slug;
st.generated_at = now;
let _ = self.persist_tx.send(PersistRequest {
slug,
generated_at: now,
});
}
Err(_) => {
}
}
}
Some(format_mark(st.generated_at, &st.current_slug))
}
fn last_synced_at(&self) -> SystemTime {
self.state
.read()
.expect("rotation state lock not poisoned")
.generated_at
}
fn data_freshness_bound(&self) -> Duration {
Duration::MAX
}
}
fn default_state_path(data_dir: &Path) -> PathBuf {
data_dir.join("kryphocron").join("rotation.state")
}
fn generate_slug() -> Result<[u8; 32], getrandom::Error> {
let mut slug = [0u8; 32];
getrandom::getrandom(&mut slug)?;
Ok(slug)
}
fn format_mark(generated_at: SystemTime, slug: &[u8; 32]) -> RotationGenerationMark {
let unix_secs = generated_at
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
RotationGenerationMark::new(format!("laquna/{:020}/{}", unix_secs, hex::encode(slug)))
.expect("rotation mark (92 bytes) fits BoundedString<128>")
}
fn read_state_file(path: &Path) -> Option<RotationState> {
let contents = std::fs::read_to_string(path).ok()?;
let mut lines = contents.lines();
let secs: u64 = lines.next()?.trim().parse().ok()?;
let hex_slug = lines.next()?.trim();
if hex_slug.len() != 64 {
return None;
}
let mut slug = [0u8; 32];
hex::decode_to_slice(hex_slug, &mut slug).ok()?;
Some(RotationState {
current_slug: slug,
generated_at: UNIX_EPOCH + Duration::from_secs(secs),
})
}
fn write_state_file(path: &Path, state: &RotationState) -> io::Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let secs = state
.generated_at
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
std::fs::write(
path,
format!("{}\n{}\n", secs, hex::encode(state.current_slug)),
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::identity::TraceId;
fn mark_with_slug(slug: [u8; 32]) -> RotationGenerationMark {
RotationGenerationMark::new(format!(
"laquna/{:020}/{}",
1_700_000_000u64,
hex::encode(slug)
))
.expect("mark fits the BoundedString bound")
}
fn enc_ctx(rkey: &str, mark: RotationGenerationMark) -> EncodeContext {
EncodeContext {
nsid: Nsid::new("tools.kryphocron.feed.postPrivate").unwrap(),
rkey: RecordKey::new(rkey).unwrap(),
originator: Did::new("did:plc:exampleexampleexample").unwrap(),
audience_list: None,
current_generation_hint: Some(mark),
trace_id: TraceId::from_bytes([0xAB; 16]),
operator_context: Default::default(),
}
}
fn dec_ctx(rkey: &str) -> DecodeContext {
DecodeContext {
nsid: Nsid::new("tools.kryphocron.feed.postPrivate").unwrap(),
rkey: RecordKey::new(rkey).unwrap(),
originator: Did::new("did:plc:exampleexampleexample").unwrap(),
audience_list: None,
trace_id: TraceId::from_bytes([0xAB; 16]),
operator_context: Default::default(),
}
}
#[tokio::test]
async fn default_codec_round_trips() {
let codec = Codec::default();
let mark = mark_with_slug([0x5a; 32]);
let plaintext = b"a private post body";
let deadline = Instant::now();
let content = codec
.encode(plaintext, &enc_ctx("3kabcdefghij2", mark.clone()), deadline)
.await
.expect("encode succeeds");
assert_ne!(content.as_slice(), plaintext.as_slice(), "encoded bytes are not plaintext");
let record = EncodedRecord {
codec: codec.codec_id(),
content,
generation: Some(mark),
};
let recovered = codec
.decode(&record, &dec_ctx("3kabcdefghij2"), deadline)
.await
.expect("decode succeeds");
assert_eq!(recovered, plaintext);
}
#[tokio::test]
async fn decode_with_wrong_identity_seed_is_malformed() {
let codec = Codec::default();
let mark = mark_with_slug([0x5a; 32]);
let deadline = Instant::now();
let content = codec
.encode(b"secret", &enc_ctx("3kabcdefghij2", mark.clone()), deadline)
.await
.unwrap();
let record = EncodedRecord {
codec: codec.codec_id(),
content,
generation: Some(mark),
};
let err = codec
.decode(&record, &dec_ctx("3kabcdefghij3"), deadline)
.await
.unwrap_err();
assert!(matches!(err, CodecError::Malformed { .. }));
}
#[tokio::test]
async fn encode_returns_rotation_unavailable_on_none_hint() {
let codec = Codec::default();
let ctx = EncodeContext {
nsid: Nsid::new("tools.kryphocron.feed.postPrivate").unwrap(),
rkey: RecordKey::new("3kabcdefghij2").unwrap(),
originator: Did::new("did:plc:exampleexampleexample").unwrap(),
audience_list: None,
current_generation_hint: None,
trace_id: TraceId::from_bytes([0xAB; 16]),
operator_context: Default::default(),
};
let err = codec.encode(b"x", &ctx, Instant::now()).await.unwrap_err();
assert!(matches!(err, CodecError::RotationStateUnavailable { .. }));
}
#[test]
fn codec_id_is_laquna_0_2() {
assert_eq!(Codec::default().codec_id().as_str(), "laquna/0.2");
}
#[test]
fn requires_rotation_is_true() {
assert!(Codec::default().requires_rotation());
}
use std::sync::atomic::{AtomicU64, Ordering};
static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
fn unique_tmp_dir() -> PathBuf {
let n = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("kryphocron-rot-{}-{}", std::process::id(), n))
}
fn rot_ctx() -> RotationContext {
RotationContext {
originator: Did::new("did:plc:exampleexampleexample").unwrap(),
nsid: Nsid::new("tools.kryphocron.feed.postPrivate").unwrap(),
audience_list: None,
}
}
#[test]
fn oracle_for_data_dir_constructs_and_serves_mark() {
let dir = unique_tmp_dir();
let oracle = DefaultRotationOracle::for_data_dir(dir.clone()).expect("construct");
let mark = oracle.current_generation(&rot_ctx()).expect("serves a mark");
assert!(mark.as_str().starts_with("laquna/"), "mark uses the §4.7 format");
assert!(
dir.join("kryphocron").join("rotation.state").exists(),
"install-time write created the state file"
);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn oracle_construction_fails_install_time_on_unwritable_path() {
let file_as_dir = unique_tmp_dir();
std::fs::write(&file_as_dir, b"i am a file, not a directory").unwrap();
let result = DefaultRotationOracle::for_data_dir(file_as_dir.clone());
assert!(matches!(
result,
Err(RotationOracleConstructionError::InitialPersistenceFailed { .. })
));
let _ = std::fs::remove_file(&file_as_dir);
}
#[test]
fn oracle_restart_preserves_slug_within_cadence() {
let dir = unique_tmp_dir();
let m1 = {
let o1 = DefaultRotationOracle::for_data_dir(dir.clone()).unwrap();
o1.current_generation(&rot_ctx()).unwrap()
};
let o2 = DefaultRotationOracle::for_data_dir(dir.clone()).unwrap();
let m2 = o2.current_generation(&rot_ctx()).unwrap();
assert_eq!(m1, m2, "restart within cadence preserves slug + generation");
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn oracle_restart_rotates_when_state_stale() {
let dir = unique_tmp_dir();
let state_path = dir.join("kryphocron").join("rotation.state");
std::fs::create_dir_all(state_path.parent().unwrap()).unwrap();
std::fs::write(&state_path, format!("0\n{}\n", hex::encode([0x11u8; 32]))).unwrap();
let stale_mark = format_mark(UNIX_EPOCH, &[0x11u8; 32]);
let oracle = DefaultRotationOracle::for_data_dir(dir.clone()).unwrap();
let mark = oracle.current_generation(&rot_ctx()).unwrap();
assert_ne!(mark, stale_mark, "stale on-disk state rotates to a fresh slug");
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn mark_format_is_lex_sortable() {
let slug = [0xABu8; 32];
let m1 = format_mark(UNIX_EPOCH + Duration::from_secs(1_000_000), &slug);
let m2 = format_mark(UNIX_EPOCH + Duration::from_secs(2_000_000), &slug);
assert!(
m1.as_str() < m2.as_str(),
"earlier generation lex-sorts before later (rev 3 §4.7)"
);
assert_eq!(m1.as_str().len(), 92, "mark is 92 chars (within BoundedString<128>)");
}
#[test]
fn builder_requires_persistence_path() {
let result = DefaultRotationOracle::builder().build();
assert!(matches!(
result,
Err(RotationOracleConstructionError::InitialPersistenceFailed { .. })
));
}
#[test]
fn builder_custom_cadence_constructs() {
let dir = unique_tmp_dir();
let oracle = DefaultRotationOracle::builder()
.cadence(Duration::from_secs(3600))
.persistence_path(dir.join("kryphocron").join("rotation.state"))
.build()
.unwrap();
assert!(oracle.current_generation(&rot_ctx()).is_some());
let _ = std::fs::remove_dir_all(&dir);
}
}