1use crate::{ChannelMembershipFact, ChannelParticipantEvent};
10use aura_core::effects::{JournalEffects, OrderClockEffects};
11use aura_core::hash::hash;
12use aura_core::time::{OrderTime, TimeStamp};
13use aura_core::types::identifiers::{AuthorityId, ChannelId, ContextId};
14use aura_core::{AuraError, FactValue, Journal, Result};
15use aura_journal::{
16 fact::{Fact, FactContent, JournalNamespace, RelationalFact},
17 reduce_context, ChannelEpochState, DomainFact, FactJournal, ProtocolRelationalFact,
18};
19
20#[async_trait::async_trait]
29pub trait AmpJournalEffects: JournalEffects + OrderClockEffects + Sized {
30 async fn fetch_context_journal(&self, context: ContextId) -> Result<FactJournal>;
32
33 async fn insert_relational_fact(&self, fact: RelationalFact) -> Result<()>;
35
36 fn context_store(&self) -> AmpContextStore<'_, Self>
38 where
39 Self: Sized,
40 {
41 AmpContextStore { effects: self }
42 }
43}
44
45#[async_trait::async_trait]
48impl<E: JournalEffects + OrderClockEffects> AmpJournalEffects for E {
49 async fn fetch_context_journal(&self, context: ContextId) -> Result<FactJournal> {
50 let journal = self.get_journal().await?;
51 let contents = extract_fact_contents(&journal);
52 Ok(build_context_journal(context, contents))
53 }
54
55 async fn insert_relational_fact(&self, fact: RelationalFact) -> Result<()> {
56 let context = fact_context(&fact)?;
57 let order = self
58 .order_time()
59 .await
60 .map_err(|e| AuraError::internal(e.to_string()))?;
61 let content = FactContent::Relational(fact);
62 let bytes =
63 serde_json::to_vec(&content).map_err(|e| AuraError::serialization(e.to_string()))?;
64 let key = format!("relational:{}:{}", context, hex::encode(order.0));
65
66 let mut delta = Journal::new();
67 delta.facts.insert(key, FactValue::Bytes(bytes))?;
68
69 let current = self.get_journal().await?;
70 let merged = self.merge_facts(current, delta).await?;
71 self.persist_journal(&merged).await?;
72 Ok(())
73 }
74}
75
76pub struct AmpContextStore<'a, E: ?Sized + JournalEffects + OrderClockEffects> {
85 effects: &'a E,
86}
87
88impl<'a, E: ?Sized + JournalEffects + OrderClockEffects> AmpContextStore<'a, E> {
89 pub async fn fetch_context_journal(&self, context: ContextId) -> Result<FactJournal> {
91 let journal = self.effects.get_journal().await?;
92 let contents = extract_fact_contents(&journal);
93 Ok(build_context_journal(context, contents))
94 }
95
96 pub async fn insert_relational_fact(&self, fact: RelationalFact) -> Result<()> {
98 let context = fact_context(&fact)?;
99 let order = self
100 .effects
101 .order_time()
102 .await
103 .map_err(|e| AuraError::internal(e.to_string()))?;
104 let content = FactContent::Relational(fact);
105 let bytes =
106 serde_json::to_vec(&content).map_err(|e| AuraError::serialization(e.to_string()))?;
107 let key = format!("relational:{}:{}", context, hex::encode(order.0));
108
109 let mut delta = Journal::new();
110 delta.facts.insert(key, FactValue::Bytes(bytes))?;
111
112 let current = self.effects.get_journal().await?;
113 let merged = self.effects.merge_facts(current, delta).await?;
114 self.effects.persist_journal(&merged).await?;
115 Ok(())
116 }
117}
118
119pub async fn get_channel_state<A: AmpJournalEffects>(
128 effects: &A,
129 context: ContextId,
130 channel: ChannelId,
131) -> Result<ChannelEpochState> {
132 let journal = effects.fetch_context_journal(context).await?;
133 let state = reduce_context(&journal)
134 .map_err(|e| AuraError::internal(format!("context reduction failed: {e}")))?;
135 state
136 .channel_epochs
137 .get(&channel)
138 .cloned()
139 .ok_or_else(|| AuraError::not_found("channel state not found"))
140}
141
142pub async fn list_channel_participants<A: AmpJournalEffects>(
144 effects: &A,
145 context: ContextId,
146 channel: ChannelId,
147) -> Result<Vec<AuthorityId>> {
148 let journal = effects.fetch_context_journal(context).await?;
149 let mut participants = std::collections::BTreeSet::new();
150
151 for fact in journal.iter_facts() {
152 let FactContent::Relational(RelationalFact::Generic { envelope, .. }) = &fact.content
153 else {
154 continue;
155 };
156 let Some(membership) = ChannelMembershipFact::from_envelope(envelope) else {
157 continue;
158 };
159 if membership.context() != context || membership.channel() != channel {
160 continue;
161 }
162 match membership.event() {
163 ChannelParticipantEvent::Joined => {
164 participants.insert(membership.participant());
165 }
166 ChannelParticipantEvent::Left => {
167 participants.remove(&membership.participant());
168 }
169 }
170 }
171
172 Ok(participants.into_iter().collect())
173}
174
175pub(crate) fn fact_context(fact: &RelationalFact) -> Result<ContextId> {
181 match fact {
182 RelationalFact::Protocol(ProtocolRelationalFact::AmpChannelCheckpoint(cp)) => {
183 Ok(cp.context)
184 }
185 RelationalFact::Protocol(ProtocolRelationalFact::AmpProposedChannelEpochBump(b)) => {
186 Ok(b.context)
187 }
188 RelationalFact::Protocol(ProtocolRelationalFact::AmpCommittedChannelEpochBump(b)) => {
189 Ok(b.context)
190 }
191 RelationalFact::Protocol(ProtocolRelationalFact::AmpChannelPolicy(p)) => Ok(p.context),
192 RelationalFact::Protocol(ProtocolRelationalFact::AmpChannelBootstrap(b)) => Ok(b.context),
193 RelationalFact::Generic {
194 context_id,
195 envelope,
196 } if envelope.type_id.as_str().starts_with("amp-") => Ok(*context_id),
197 _ => Err(AuraError::invalid("fact not AMP-context scoped")),
198 }
199}
200
201fn extract_fact_contents(journal: &Journal) -> Vec<(Option<OrderTime>, FactContent)> {
203 journal
204 .read_facts()
205 .iter()
206 .filter_map(|(key, value)| {
207 let content = match value {
208 FactValue::Bytes(bytes) => serde_json::from_slice(bytes).ok(),
209 FactValue::String(text) => serde_json::from_str(text).ok(),
210 FactValue::Nested(nested) => serde_json::to_vec(nested)
211 .ok()
212 .and_then(|bytes| serde_json::from_slice(&bytes).ok()),
213 _ => None,
214 };
215 content.map(|content| (parse_order_from_key(key.as_str()), content))
216 })
217 .collect()
218}
219
220fn parse_order_from_key(key: &str) -> Option<OrderTime> {
222 let suffix = key.rsplit(':').next()?;
223 let bytes = hex::decode(suffix).ok()?;
224 if bytes.len() != 32 {
225 return None;
226 }
227 let mut order = [0u8; 32];
228 order.copy_from_slice(&bytes);
229 Some(OrderTime(order))
230}
231
232fn build_context_journal(
234 context: ContextId,
235 contents: Vec<(Option<OrderTime>, FactContent)>,
236) -> FactJournal {
237 let mut facts = std::collections::BTreeSet::new();
238
239 for (order_hint, content) in contents {
240 if let FactContent::Relational(ref relational) = content {
241 if fact_context(relational).ok() != Some(context) {
242 continue;
243 }
244
245 let bytes = serde_json::to_vec(&content).unwrap_or_default();
246 let order = order_hint.unwrap_or_else(|| OrderTime(hash(&bytes)));
247 let timestamp = TimeStamp::OrderClock(order.clone());
248 facts.insert(Fact::new(order, timestamp, content));
249 }
250 }
251
252 FactJournal {
253 namespace: JournalNamespace::Context(context),
254 facts,
255 }
256}