pub(crate) mod event;
pub(crate) mod voting;
pub(crate) mod voting_power;
use std::collections::{hash_map::Entry, HashMap, HashSet};
use serde::{Deserialize, Serialize};
use crate::{
client::{node_manager::node::Node, secret::SecretManage, Client},
types::{
api::plugins::participation::{
responses::TrackedParticipation,
types::{ParticipationEventData, ParticipationEventId, Participations, PARTICIPATION_TAG},
},
block::output::{unlock_condition::UnlockCondition, Output, OutputId},
},
wallet::{
account::{Account, OutputData},
task, Result,
},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccountParticipationOverview {
pub participations: HashMap<ParticipationEventId, HashMap<OutputId, TrackedParticipation>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ParticipationEventWithNodes {
pub id: ParticipationEventId,
pub data: ParticipationEventData,
pub nodes: Vec<Node>,
}
impl<S: 'static + SecretManage> Account<S>
where
crate::wallet::Error: From<S::Error>,
{
pub async fn get_participation_overview(
&self,
event_ids: Option<Vec<ParticipationEventId>>,
) -> Result<AccountParticipationOverview> {
log::debug!("[get_participation_overview]");
let mut spent_cached_outputs = self
.wallet
.storage_manager
.read()
.await
.get_cached_participation_output_status(self.details().await.index)
.await?;
let restored_spent_cached_outputs_len = spent_cached_outputs.len();
log::debug!(
"[get_participation_overview] restored_spent_cached_outputs_len: {}",
restored_spent_cached_outputs_len
);
let outputs = self.outputs(None).await?;
let participation_outputs = outputs
.into_iter()
.filter(|output_data| {
is_valid_participation_output(&output_data.output)
&& output_data.output.features().and_then(|f| f.metadata()).is_some()
&& !spent_cached_outputs.contains_key(&output_data.output_id)
})
.collect::<Vec<OutputData>>();
let mut events = HashMap::new();
let mut spent_outputs = HashSet::new();
for output_data in participation_outputs {
let metadata = output_data.output.features().and_then(|f| f.metadata()).unwrap();
if let Ok(participations) = Participations::from_bytes(&mut metadata.data()) {
for participation in participations.participations {
if let Some(event_ids) = event_ids.as_ref() {
if !event_ids.contains(&participation.event_id) {
continue;
}
}
match events.entry(participation.event_id) {
Entry::Vacant(entry) => {
entry.insert(vec![output_data.output_id]);
}
Entry::Occupied(mut entry) => {
entry.get_mut().push(output_data.output_id);
}
}
if output_data.is_spent {
spent_outputs.insert(output_data.output_id);
}
}
};
}
let mut participations: HashMap<ParticipationEventId, HashMap<OutputId, TrackedParticipation>> = HashMap::new();
for (output_id, output_status_response) in &spent_cached_outputs {
for (event_id, participation) in &output_status_response.participations {
if let Some(event_ids) = event_ids.as_ref() {
if !event_ids.contains(event_id) {
continue;
}
}
match participations.entry(*event_id) {
Entry::Vacant(entry) => {
entry.insert(HashMap::from([(*output_id, participation.clone())]));
}
Entry::Occupied(mut entry) => {
entry.get_mut().insert(*output_id, participation.clone());
}
}
}
}
for (event_id, mut output_ids) in events {
log::debug!(
"[get_participation_overview] requesting {} outputs for event {event_id}",
output_ids.len()
);
let event_client = self.get_client_for_event(&event_id).await?;
output_ids.retain(|output_id| {
if let Some(p) = participations.get(&event_id) {
if p.contains_key(output_id) {
log::debug!(
"[get_participation_overview] skip requesting already known {output_id} for event {event_id}",
);
return false;
}
}
true
});
let results = futures::future::try_join_all(output_ids.chunks(100).map(ToOwned::to_owned).map(|chunk| {
let event_client = event_client.clone();
task::spawn(async move {
futures::future::join_all(chunk.iter().map(|output_id| async {
let output_id = *output_id;
(event_client.output_status(&output_id).await, output_id)
}))
.await
})
}))
.await?;
for (result, output_id) in results.into_iter().flatten() {
match result {
Ok(status) => {
if spent_outputs.contains(&output_id) {
match spent_cached_outputs.entry(output_id) {
Entry::Vacant(entry) => {
entry.insert(status.clone());
}
Entry::Occupied(mut entry) => {
let output_status_response = entry.get_mut();
for (event_id, participation) in &status.participations {
output_status_response
.participations
.insert(*event_id, participation.clone());
}
}
}
}
for (event_id, participation) in status.participations {
if let Some(event_ids) = event_ids.as_ref() {
if !event_ids.contains(&event_id) {
continue;
}
}
match participations.entry(event_id) {
Entry::Vacant(entry) => {
entry.insert(HashMap::from([(output_id, participation)]));
}
Entry::Occupied(mut entry) => {
entry.get_mut().insert(output_id, participation);
}
}
}
}
Err(crate::client::Error::Node(crate::client::node_api::error::Error::NotFound(_))) => {}
Err(e) => return Err(crate::wallet::Error::Client(e.into())),
}
}
}
log::debug!(
"[get_participation_overview] new spent_cached_outputs: {}",
spent_cached_outputs.len()
);
if spent_cached_outputs.len() > restored_spent_cached_outputs_len {
self.wallet
.storage_manager
.read()
.await
.set_cached_participation_output_status(self.details().await.index, &spent_cached_outputs)
.await?;
}
Ok(AccountParticipationOverview { participations })
}
pub async fn get_voting_output(&self) -> Result<Option<OutputData>> {
log::debug!("[get_voting_output]");
Ok(self
.unspent_outputs(None)
.await?
.iter()
.filter(|output_data| is_valid_participation_output(&output_data.output))
.max_by_key(|output_data| output_data.output.amount())
.cloned())
}
pub(crate) async fn get_client_for_event(&self, id: &ParticipationEventId) -> crate::wallet::Result<Client> {
log::debug!("[get_client_for_event]");
let account_index = self.details().await.index;
let events = self
.wallet
.storage_manager
.read()
.await
.get_participation_events(account_index)
.await?;
let event_with_nodes = match events.get(id) {
Some(event_with_nodes) => event_with_nodes,
None => return Ok(self.client().clone()),
};
let mut client_builder = Client::builder().with_ignore_node_health();
for node in &event_with_nodes.nodes {
client_builder = client_builder.with_node_auth(node.url.as_str(), node.auth.clone())?;
}
Ok(client_builder.finish().await?)
}
pub(crate) async fn remove_ended_participation_events(
&self,
participations: &mut Participations,
) -> crate::wallet::Result<()> {
log::debug!("[remove_ended_participation_events]");
let latest_milestone_index = self.client().get_info().await?.node_info.status.latest_milestone.index;
let account_index = self.details().await.index;
let events = self
.wallet
.storage_manager
.read()
.await
.get_participation_events(account_index)
.await?;
for participation in participations.participations.clone().iter() {
if let Some(event_with_nodes) = events.get(&participation.event_id) {
if event_with_nodes.data.milestone_index_end() < &latest_milestone_index {
participations.remove(&participation.event_id);
}
} else {
if let Ok(event_status) = self.get_participation_event_status(&participation.event_id).await {
if event_status.status() == "ended" {
participations.remove(&participation.event_id);
}
}
}
}
Ok(())
}
}
fn is_valid_participation_output(output: &Output) -> bool {
if let Output::Basic(basic_output) = &output {
let [UnlockCondition::Address(_)] = basic_output.unlock_conditions().as_ref() else {
return false;
};
basic_output
.features()
.tag()
.map_or(false, |tag| tag.tag() == PARTICIPATION_TAG.as_bytes())
} else {
false
}
}
#[cfg(test)]
impl ParticipationEventWithNodes {
pub fn mock() -> Self {
Self {
id: ParticipationEventId::new([42; 32]),
data: ParticipationEventData::mock(),
nodes: Vec::new(),
}
}
}