Skip to main content

aura_agent/runtime/effects/
amp.rs

1use super::{AuraEffectSystem, DEFAULT_WINDOW};
2use async_trait::async_trait;
3use aura_core::effects::{
4    AmpChannelEffects, AmpChannelError, AmpCiphertext, ChannelCloseParams, ChannelCreateParams,
5    ChannelJoinParams, ChannelLeaveParams, ChannelSendParams, RandomCoreEffects,
6    RandomExtendedEffects,
7};
8use aura_core::hash::hash;
9use aura_core::{AuraError, ChannelId, Hash32};
10use aura_journal::DomainFact;
11use aura_protocol::amp::{AmpJournalEffects, ChannelMembershipFact, ChannelParticipantEvent};
12use aura_protocol::effects::TreeEffects;
13
14#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
15#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
16impl AmpChannelEffects for AuraEffectSystem {
17    async fn create_channel(
18        &self,
19        params: ChannelCreateParams,
20    ) -> Result<ChannelId, AmpChannelError> {
21        let channel = if let Some(id) = params.channel {
22            id
23        } else {
24            let bytes = self.random_bytes(32).await;
25            ChannelId::from_bytes(hash(&bytes))
26        };
27
28        let window = params.skip_window.unwrap_or(DEFAULT_WINDOW);
29
30        let checkpoint = aura_journal::fact::ChannelCheckpoint {
31            context: params.context,
32            channel,
33            chan_epoch: 0,
34            base_gen: 0,
35            window,
36            ck_commitment: Hash32::default(),
37            skip_window_override: Some(window),
38        };
39
40        self.insert_relational_fact(aura_journal::fact::RelationalFact::Protocol(
41            aura_journal::ProtocolRelationalFact::AmpChannelCheckpoint(checkpoint),
42        ))
43        .await
44        .map_err(map_amp_err)?;
45
46        if params.topic.is_some() || params.skip_window.is_some() {
47            let policy = aura_journal::fact::ChannelPolicy {
48                context: params.context,
49                channel,
50                skip_window: params.skip_window.or(Some(window)),
51            };
52            self.insert_relational_fact(aura_journal::fact::RelationalFact::Protocol(
53                aura_journal::ProtocolRelationalFact::AmpChannelPolicy(policy),
54            ))
55            .await
56            .map_err(map_amp_err)?;
57        }
58        Ok(channel)
59    }
60
61    async fn close_channel(&self, params: ChannelCloseParams) -> Result<(), AmpChannelError> {
62        let state = aura_protocol::amp::get_channel_state(self, params.context, params.channel)
63            .await
64            .map_err(map_amp_err)?;
65        let bump_nonce = self.random_uuid().await.as_bytes().to_vec();
66        let bump_id = Hash32(hash(&bump_nonce));
67        let proposal = aura_journal::fact::ProposedChannelEpochBump {
68            context: params.context,
69            channel: params.channel,
70            parent_epoch: state.chan_epoch,
71            new_epoch: state.chan_epoch + 1,
72            bump_id,
73            reason: aura_journal::fact::ChannelBumpReason::Routine,
74        };
75
76        aura_protocol::amp::emit_proposed_bump(self, proposal.clone())
77            .await
78            .map_err(map_amp_err)?;
79
80        let policy =
81            aura_core::threshold::policy_for(aura_core::threshold::CeremonyFlow::AmpEpochBump);
82        if policy.allows_mode(aura_core::threshold::AgreementMode::ConsensusFinalized) {
83            let tree_state = self.get_current_state().await.map_err(map_amp_err)?;
84            let journal = self
85                .fetch_context_journal(params.context)
86                .await
87                .map_err(map_amp_err)?;
88            let mut hasher = aura_core::hash::hasher();
89            hasher.update(b"RELATIONAL_CONTEXT_FACTS");
90            hasher.update(params.context.as_bytes());
91            for fact in journal.facts.iter() {
92                let bytes = aura_core::util::serialization::to_vec(fact).map_err(|e| {
93                    map_amp_err(AuraError::internal(format!(
94                        "Failed to serialize context fact: {e}"
95                    )))
96                })?;
97                hasher.update(&bytes);
98            }
99            let context_commitment = Hash32(hasher.finalize());
100            let prestate = aura_core::Prestate::new(
101                vec![(self.authority_id, Hash32(tree_state.root_commitment))],
102                context_commitment,
103            )
104            .map_err(|e| map_amp_err(AuraError::invalid(format!("Invalid AMP prestate: {e}"))))?;
105            let consensus_params = crate::runtime::consensus::build_consensus_params(
106                params.context,
107                self,
108                self.authority_id,
109                self,
110            )
111            .await
112            .map_err(map_amp_err)?;
113            let transcript_ref = self
114                .latest_dkg_transcript_commit(self.authority_id, params.context)
115                .await
116                .map_err(map_amp_err)?
117                .and_then(|commit| commit.blob_ref.or(Some(commit.transcript_hash)));
118
119            aura_protocol::amp::commit_bump_with_consensus(
120                self,
121                &prestate,
122                &proposal,
123                consensus_params.key_packages,
124                consensus_params.group_public_key,
125                transcript_ref,
126            )
127            .await
128            .map_err(map_amp_err)?;
129        }
130
131        let policy = aura_journal::fact::ChannelPolicy {
132            context: params.context,
133            channel: params.channel,
134            skip_window: Some(0),
135        };
136
137        self.insert_relational_fact(aura_journal::fact::RelationalFact::Protocol(
138            aura_journal::ProtocolRelationalFact::AmpChannelPolicy(policy),
139        ))
140        .await
141        .map_err(map_amp_err)?;
142
143        Ok(())
144    }
145
146    async fn join_channel(&self, params: ChannelJoinParams) -> Result<(), AmpChannelError> {
147        aura_protocol::amp::get_channel_state(self, params.context, params.channel)
148            .await
149            .map_err(map_amp_err)?;
150        let timestamp = ChannelMembershipFact::random_timestamp(self).await;
151        let membership = ChannelMembershipFact::new(
152            params.context,
153            params.channel,
154            params.participant,
155            ChannelParticipantEvent::Joined,
156            timestamp,
157        );
158        self.insert_relational_fact(membership.to_generic())
159            .await
160            .map_err(map_amp_err)?;
161
162        tracing::debug!(
163            "Participant {:?} joined channel {:?} in context {:?}",
164            params.participant,
165            params.channel,
166            params.context
167        );
168
169        Ok(())
170    }
171
172    async fn leave_channel(&self, params: ChannelLeaveParams) -> Result<(), AmpChannelError> {
173        aura_protocol::amp::get_channel_state(self, params.context, params.channel)
174            .await
175            .map_err(map_amp_err)?;
176        let timestamp = ChannelMembershipFact::random_timestamp(self).await;
177        let membership = ChannelMembershipFact::new(
178            params.context,
179            params.channel,
180            params.participant,
181            ChannelParticipantEvent::Left,
182            timestamp,
183        );
184        self.insert_relational_fact(membership.to_generic())
185            .await
186            .map_err(map_amp_err)?;
187
188        tracing::debug!(
189            "Participant {:?} left channel {:?} in context {:?}",
190            params.participant,
191            params.channel,
192            params.context
193        );
194
195        Ok(())
196    }
197
198    async fn send_message(
199        &self,
200        params: ChannelSendParams,
201    ) -> Result<AmpCiphertext, AmpChannelError> {
202        let config = aura_protocol::amp::config::AmpRuntimeConfig::default();
203        aura_protocol::amp::amp_send(
204            self,
205            params.context,
206            params.channel,
207            params.plaintext,
208            &config,
209        )
210        .await
211        .map_err(map_amp_err)
212    }
213}
214
215fn map_amp_err(e: aura_core::AuraError) -> AmpChannelError {
216    AmpChannelError::Internal(e.to_string())
217}