Skip to main content

aura_amp/
journal.rs

1//! AMP journal effects and context journal operations.
2//!
3//! This module provides the `AmpJournalEffects` trait adapter that bridges
4//! Layer 4 AMP operations to Layer 2 journal facts. It handles:
5//! - Fetching and building context-scoped fact journals
6//! - Inserting relational facts (checkpoints, bumps, policies)
7//! - Channel state reduction via journal queries
8
9use crate::{ChannelMembershipFact, ChannelParticipantEvent};
10use aura_core::effects::{JournalEffects, OrderClockEffects};
11use aura_core::hash::hash;
12use aura_core::time::{OrderTime, TimeStamp};
13use aura_core::types::identifiers::{AuthorityId, ChannelId, ContextId};
14use aura_core::{AuraError, FactValue, Journal, Result};
15use aura_journal::{
16    fact::{Fact, FactContent, JournalNamespace, RelationalFact},
17    reduce_context, ChannelEpochState, DomainFact, FactJournal, ProtocolRelationalFact,
18};
19
20// ============================================================================
21// AmpJournalEffects Trait
22// ============================================================================
23
24/// Protocol-layer journal adapter for AMP.
25///
26/// This trait extends `JournalEffects` and `OrderClockEffects` to provide
27/// AMP-specific operations for managing context journals and relational facts.
28#[async_trait::async_trait]
29pub trait AmpJournalEffects: JournalEffects + OrderClockEffects + Sized {
30    /// Fetch the full context journal (fact-based) for reduction.
31    async fn fetch_context_journal(&self, context: ContextId) -> Result<FactJournal>;
32
33    /// Insert a relational fact (AMP checkpoint/bump/policy/evidence).
34    async fn insert_relational_fact(&self, fact: RelationalFact) -> Result<()>;
35
36    /// Scoped context store wrapper to avoid leaking storage keys.
37    fn context_store(&self) -> AmpContextStore<'_, Self>
38    where
39        Self: Sized,
40    {
41        AmpContextStore { effects: self }
42    }
43}
44
45/// Blanket implementation of `AmpJournalEffects` for any type implementing
46/// `JournalEffects + OrderClockEffects`.
47#[async_trait::async_trait]
48impl<E: JournalEffects + OrderClockEffects> AmpJournalEffects for E {
49    async fn fetch_context_journal(&self, context: ContextId) -> Result<FactJournal> {
50        let journal = self.get_journal().await?;
51        let contents = extract_fact_contents(&journal);
52        Ok(build_context_journal(context, contents))
53    }
54
55    async fn insert_relational_fact(&self, fact: RelationalFact) -> Result<()> {
56        let context = fact_context(&fact)?;
57        let order = self
58            .order_time()
59            .await
60            .map_err(|e| AuraError::internal(e.to_string()))?;
61        let content = FactContent::Relational(fact);
62        let bytes =
63            serde_json::to_vec(&content).map_err(|e| AuraError::serialization(e.to_string()))?;
64        let key = format!("relational:{}:{}", context, hex::encode(order.0));
65
66        let mut delta = Journal::new();
67        delta.facts.insert(key, FactValue::Bytes(bytes))?;
68
69        let current = self.get_journal().await?;
70        let merged = self.merge_facts(current, delta).await?;
71        self.persist_journal(&merged).await?;
72        Ok(())
73    }
74}
75
76// ============================================================================
77// AmpContextStore
78// ============================================================================
79
80/// Focused context journal helper that hides storage keys/serialization.
81///
82/// This provides a scoped view into the journal for a specific context,
83/// avoiding direct manipulation of storage keys.
84pub struct AmpContextStore<'a, E: ?Sized + JournalEffects + OrderClockEffects> {
85    effects: &'a E,
86}
87
88impl<'a, E: ?Sized + JournalEffects + OrderClockEffects> AmpContextStore<'a, E> {
89    /// Fetch the context journal for reduction.
90    pub async fn fetch_context_journal(&self, context: ContextId) -> Result<FactJournal> {
91        let journal = self.effects.get_journal().await?;
92        let contents = extract_fact_contents(&journal);
93        Ok(build_context_journal(context, contents))
94    }
95
96    /// Insert a relational fact into the journal.
97    pub async fn insert_relational_fact(&self, fact: RelationalFact) -> Result<()> {
98        let context = fact_context(&fact)?;
99        let order = self
100            .effects
101            .order_time()
102            .await
103            .map_err(|e| AuraError::internal(e.to_string()))?;
104        let content = FactContent::Relational(fact);
105        let bytes =
106            serde_json::to_vec(&content).map_err(|e| AuraError::serialization(e.to_string()))?;
107        let key = format!("relational:{}:{}", context, hex::encode(order.0));
108
109        let mut delta = Journal::new();
110        delta.facts.insert(key, FactValue::Bytes(bytes))?;
111
112        let current = self.effects.get_journal().await?;
113        let merged = self.effects.merge_facts(current, delta).await?;
114        self.effects.persist_journal(&merged).await?;
115        Ok(())
116    }
117}
118
119// ============================================================================
120// Channel State Reduction
121// ============================================================================
122
123/// Reduce to AMP channel state for a (context, channel) pair.
124///
125/// This fetches the context journal and reduces it to extract the current
126/// epoch state for the specified channel.
127pub async fn get_channel_state<A: AmpJournalEffects>(
128    effects: &A,
129    context: ContextId,
130    channel: ChannelId,
131) -> Result<ChannelEpochState> {
132    let journal = effects.fetch_context_journal(context).await?;
133    let state = reduce_context(&journal)
134        .map_err(|e| AuraError::internal(format!("context reduction failed: {e}")))?;
135    state
136        .channel_epochs
137        .get(&channel)
138        .cloned()
139        .ok_or_else(|| AuraError::not_found("channel state not found"))
140}
141
142/// Reduce the current AMP channel participants for a `(context, channel)` pair.
143pub async fn list_channel_participants<A: AmpJournalEffects>(
144    effects: &A,
145    context: ContextId,
146    channel: ChannelId,
147) -> Result<Vec<AuthorityId>> {
148    let journal = effects.fetch_context_journal(context).await?;
149    let mut participants = std::collections::BTreeSet::new();
150
151    for fact in journal.iter_facts() {
152        let FactContent::Relational(RelationalFact::Generic { envelope, .. }) = &fact.content
153        else {
154            continue;
155        };
156        let Some(membership) = ChannelMembershipFact::from_envelope(envelope) else {
157            continue;
158        };
159        if membership.context() != context || membership.channel() != channel {
160            continue;
161        }
162        match membership.event() {
163            ChannelParticipantEvent::Joined => {
164                participants.insert(membership.participant());
165            }
166            ChannelParticipantEvent::Left => {
167                participants.remove(&membership.participant());
168            }
169        }
170    }
171
172    Ok(participants.into_iter().collect())
173}
174
175// ============================================================================
176// Internal Helpers
177// ============================================================================
178
179/// Extract the context ID from a relational fact.
180pub(crate) fn fact_context(fact: &RelationalFact) -> Result<ContextId> {
181    match fact {
182        RelationalFact::Protocol(ProtocolRelationalFact::AmpChannelCheckpoint(cp)) => {
183            Ok(cp.context)
184        }
185        RelationalFact::Protocol(ProtocolRelationalFact::AmpProposedChannelEpochBump(b)) => {
186            Ok(b.context)
187        }
188        RelationalFact::Protocol(ProtocolRelationalFact::AmpCommittedChannelEpochBump(b)) => {
189            Ok(b.context)
190        }
191        RelationalFact::Protocol(ProtocolRelationalFact::AmpChannelPolicy(p)) => Ok(p.context),
192        RelationalFact::Protocol(ProtocolRelationalFact::AmpChannelBootstrap(b)) => Ok(b.context),
193        RelationalFact::Generic {
194            context_id,
195            envelope,
196        } if envelope.type_id.as_str().starts_with("amp-") => Ok(*context_id),
197        _ => Err(AuraError::invalid("fact not AMP-context scoped")),
198    }
199}
200
201/// Extract fact contents from a core journal.
202fn extract_fact_contents(journal: &Journal) -> Vec<(Option<OrderTime>, FactContent)> {
203    journal
204        .read_facts()
205        .iter()
206        .filter_map(|(key, value)| {
207            let content = match value {
208                FactValue::Bytes(bytes) => serde_json::from_slice(bytes).ok(),
209                FactValue::String(text) => serde_json::from_str(text).ok(),
210                FactValue::Nested(nested) => serde_json::to_vec(nested)
211                    .ok()
212                    .and_then(|bytes| serde_json::from_slice(&bytes).ok()),
213                _ => None,
214            };
215            content.map(|content| (parse_order_from_key(key.as_str()), content))
216        })
217        .collect()
218}
219
220/// Parse an order time from a journal key suffix.
221fn parse_order_from_key(key: &str) -> Option<OrderTime> {
222    let suffix = key.rsplit(':').next()?;
223    let bytes = hex::decode(suffix).ok()?;
224    if bytes.len() != 32 {
225        return None;
226    }
227    let mut order = [0u8; 32];
228    order.copy_from_slice(&bytes);
229    Some(OrderTime(order))
230}
231
232/// Build a context-scoped fact journal from extracted contents.
233fn build_context_journal(
234    context: ContextId,
235    contents: Vec<(Option<OrderTime>, FactContent)>,
236) -> FactJournal {
237    let mut facts = std::collections::BTreeSet::new();
238
239    for (order_hint, content) in contents {
240        if let FactContent::Relational(ref relational) = content {
241            if fact_context(relational).ok() != Some(context) {
242                continue;
243            }
244
245            let bytes = serde_json::to_vec(&content).unwrap_or_default();
246            let order = order_hint.unwrap_or_else(|| OrderTime(hash(&bytes)));
247            let timestamp = TimeStamp::OrderClock(order.clone());
248            facts.insert(Fact::new(order, timestamp, content));
249        }
250    }
251
252    FactJournal {
253        namespace: JournalNamespace::Context(context),
254        facts,
255    }
256}