aura_agent/runtime/effects/
amp.rs1use super::{AuraEffectSystem, DEFAULT_WINDOW};
2use async_trait::async_trait;
3use aura_core::effects::{
4 AmpChannelEffects, AmpChannelError, AmpCiphertext, ChannelCloseParams, ChannelCreateParams,
5 ChannelJoinParams, ChannelLeaveParams, ChannelSendParams, RandomCoreEffects,
6 RandomExtendedEffects,
7};
8use aura_core::hash::hash;
9use aura_core::{AuraError, ChannelId, Hash32};
10use aura_journal::DomainFact;
11use aura_protocol::amp::{AmpJournalEffects, ChannelMembershipFact, ChannelParticipantEvent};
12use aura_protocol::effects::TreeEffects;
13
14#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
15#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
16impl AmpChannelEffects for AuraEffectSystem {
17 async fn create_channel(
18 &self,
19 params: ChannelCreateParams,
20 ) -> Result<ChannelId, AmpChannelError> {
21 let channel = if let Some(id) = params.channel {
22 id
23 } else {
24 let bytes = self.random_bytes(32).await;
25 ChannelId::from_bytes(hash(&bytes))
26 };
27
28 let window = params.skip_window.unwrap_or(DEFAULT_WINDOW);
29
30 let checkpoint = aura_journal::fact::ChannelCheckpoint {
31 context: params.context,
32 channel,
33 chan_epoch: 0,
34 base_gen: 0,
35 window,
36 ck_commitment: Hash32::default(),
37 skip_window_override: Some(window),
38 };
39
40 self.insert_relational_fact(aura_journal::fact::RelationalFact::Protocol(
41 aura_journal::ProtocolRelationalFact::AmpChannelCheckpoint(checkpoint),
42 ))
43 .await
44 .map_err(map_amp_err)?;
45
46 if params.topic.is_some() || params.skip_window.is_some() {
47 let policy = aura_journal::fact::ChannelPolicy {
48 context: params.context,
49 channel,
50 skip_window: params.skip_window.or(Some(window)),
51 };
52 self.insert_relational_fact(aura_journal::fact::RelationalFact::Protocol(
53 aura_journal::ProtocolRelationalFact::AmpChannelPolicy(policy),
54 ))
55 .await
56 .map_err(map_amp_err)?;
57 }
58 Ok(channel)
59 }
60
61 async fn close_channel(&self, params: ChannelCloseParams) -> Result<(), AmpChannelError> {
62 let state = aura_protocol::amp::get_channel_state(self, params.context, params.channel)
63 .await
64 .map_err(map_amp_err)?;
65 let bump_nonce = self.random_uuid().await.as_bytes().to_vec();
66 let bump_id = Hash32(hash(&bump_nonce));
67 let proposal = aura_journal::fact::ProposedChannelEpochBump {
68 context: params.context,
69 channel: params.channel,
70 parent_epoch: state.chan_epoch,
71 new_epoch: state.chan_epoch + 1,
72 bump_id,
73 reason: aura_journal::fact::ChannelBumpReason::Routine,
74 };
75
76 aura_protocol::amp::emit_proposed_bump(self, proposal.clone())
77 .await
78 .map_err(map_amp_err)?;
79
80 let policy =
81 aura_core::threshold::policy_for(aura_core::threshold::CeremonyFlow::AmpEpochBump);
82 if policy.allows_mode(aura_core::threshold::AgreementMode::ConsensusFinalized) {
83 let tree_state = self.get_current_state().await.map_err(map_amp_err)?;
84 let journal = self
85 .fetch_context_journal(params.context)
86 .await
87 .map_err(map_amp_err)?;
88 let mut hasher = aura_core::hash::hasher();
89 hasher.update(b"RELATIONAL_CONTEXT_FACTS");
90 hasher.update(params.context.as_bytes());
91 for fact in journal.facts.iter() {
92 let bytes = aura_core::util::serialization::to_vec(fact).map_err(|e| {
93 map_amp_err(AuraError::internal(format!(
94 "Failed to serialize context fact: {e}"
95 )))
96 })?;
97 hasher.update(&bytes);
98 }
99 let context_commitment = Hash32(hasher.finalize());
100 let prestate = aura_core::Prestate::new(
101 vec![(self.authority_id, Hash32(tree_state.root_commitment))],
102 context_commitment,
103 )
104 .map_err(|e| map_amp_err(AuraError::invalid(format!("Invalid AMP prestate: {e}"))))?;
105 let consensus_params = crate::runtime::consensus::build_consensus_params(
106 params.context,
107 self,
108 self.authority_id,
109 self,
110 )
111 .await
112 .map_err(map_amp_err)?;
113 let transcript_ref = self
114 .latest_dkg_transcript_commit(self.authority_id, params.context)
115 .await
116 .map_err(map_amp_err)?
117 .and_then(|commit| commit.blob_ref.or(Some(commit.transcript_hash)));
118
119 aura_protocol::amp::commit_bump_with_consensus(
120 self,
121 &prestate,
122 &proposal,
123 consensus_params.key_packages,
124 consensus_params.group_public_key,
125 transcript_ref,
126 )
127 .await
128 .map_err(map_amp_err)?;
129 }
130
131 let policy = aura_journal::fact::ChannelPolicy {
132 context: params.context,
133 channel: params.channel,
134 skip_window: Some(0),
135 };
136
137 self.insert_relational_fact(aura_journal::fact::RelationalFact::Protocol(
138 aura_journal::ProtocolRelationalFact::AmpChannelPolicy(policy),
139 ))
140 .await
141 .map_err(map_amp_err)?;
142
143 Ok(())
144 }
145
146 async fn join_channel(&self, params: ChannelJoinParams) -> Result<(), AmpChannelError> {
147 aura_protocol::amp::get_channel_state(self, params.context, params.channel)
148 .await
149 .map_err(map_amp_err)?;
150 let timestamp = ChannelMembershipFact::random_timestamp(self).await;
151 let membership = ChannelMembershipFact::new(
152 params.context,
153 params.channel,
154 params.participant,
155 ChannelParticipantEvent::Joined,
156 timestamp,
157 );
158 self.insert_relational_fact(membership.to_generic())
159 .await
160 .map_err(map_amp_err)?;
161
162 tracing::debug!(
163 "Participant {:?} joined channel {:?} in context {:?}",
164 params.participant,
165 params.channel,
166 params.context
167 );
168
169 Ok(())
170 }
171
172 async fn leave_channel(&self, params: ChannelLeaveParams) -> Result<(), AmpChannelError> {
173 aura_protocol::amp::get_channel_state(self, params.context, params.channel)
174 .await
175 .map_err(map_amp_err)?;
176 let timestamp = ChannelMembershipFact::random_timestamp(self).await;
177 let membership = ChannelMembershipFact::new(
178 params.context,
179 params.channel,
180 params.participant,
181 ChannelParticipantEvent::Left,
182 timestamp,
183 );
184 self.insert_relational_fact(membership.to_generic())
185 .await
186 .map_err(map_amp_err)?;
187
188 tracing::debug!(
189 "Participant {:?} left channel {:?} in context {:?}",
190 params.participant,
191 params.channel,
192 params.context
193 );
194
195 Ok(())
196 }
197
198 async fn send_message(
199 &self,
200 params: ChannelSendParams,
201 ) -> Result<AmpCiphertext, AmpChannelError> {
202 let config = aura_protocol::amp::config::AmpRuntimeConfig::default();
203 aura_protocol::amp::amp_send(
204 self,
205 params.context,
206 params.channel,
207 params.plaintext,
208 &config,
209 )
210 .await
211 .map_err(map_amp_err)
212 }
213}
214
215fn map_amp_err(e: aura_core::AuraError) -> AmpChannelError {
216 AmpChannelError::Internal(e.to_string())
217}