use std::collections::HashMap;
use std::convert::TryFrom;
use diesel::{prelude::*, types::HasSqlType};
use super::AdminServiceStoreOperations;
use crate::admin::store::{
diesel::{
models::{
AdminEventCircuitProposalModel, AdminEventProposedCircuitModel,
AdminEventProposedNodeEndpointModel, AdminEventProposedNodeModel,
AdminEventProposedServiceArgumentModel, AdminEventProposedServiceModel,
AdminEventVoteRecordModel, AdminServiceEventModel,
},
schema::{
admin_event_circuit_proposal, admin_event_proposed_circuit, admin_event_proposed_node,
admin_event_proposed_node_endpoint, admin_event_proposed_service,
admin_event_proposed_service_argument, admin_event_vote_record, admin_service_event,
},
},
AdminServiceEvent, AdminServiceStoreError, AuthorizationType, CircuitProposalBuilder,
DurabilityType, EventIter, PersistenceType, ProposalType, ProposedCircuitBuilder, ProposedNode,
ProposedNodeBuilder, ProposedService, ProposedServiceBuilder, RouteType, VoteRecord,
};
use crate::public_key::PublicKey;
pub(in crate::admin::store::diesel) trait AdminServiceStoreListEventsOperation {
fn list_events(&self, events_id: Vec<i64>) -> Result<EventIter, AdminServiceStoreError>;
}
impl<'a, C> AdminServiceStoreListEventsOperation for AdminServiceStoreOperations<'a, C>
where
C: diesel::Connection,
C::Backend: HasSqlType<diesel::sql_types::BigInt>,
String: diesel::deserialize::FromSql<diesel::sql_types::Text, C::Backend>,
i64: diesel::deserialize::FromSql<diesel::sql_types::BigInt, C::Backend>,
i32: diesel::deserialize::FromSql<diesel::sql_types::Integer, C::Backend>,
Vec<u8>: diesel::deserialize::FromSql<diesel::sql_types::Binary, C::Backend>,
i16: diesel::deserialize::FromSql<diesel::sql_types::SmallInt, C::Backend>,
{
fn list_events(&self, event_ids: Vec<i64>) -> Result<EventIter, AdminServiceStoreError> {
self.conn.transaction::<EventIter, _, _>(|| {
let event_models: Vec<(
AdminServiceEventModel,
AdminEventCircuitProposalModel,
AdminEventProposedCircuitModel,
)> = admin_service_event::table
.filter(admin_service_event::id.eq_any(&event_ids))
.inner_join(
admin_event_circuit_proposal::table
.on(admin_service_event::id.eq(admin_event_circuit_proposal::event_id)),
)
.inner_join(
admin_event_proposed_circuit::table
.on(admin_service_event::id.eq(admin_event_proposed_circuit::event_id)),
)
.load::<(
AdminServiceEventModel,
AdminEventCircuitProposalModel,
AdminEventProposedCircuitModel,
)>(self.conn)?;
let events_map: HashMap<
i64,
(
AdminServiceEventModel,
CircuitProposalBuilder,
ProposedCircuitBuilder,
),
> = event_models
.into_iter()
.map(
|(event_model, circuit_proposal_model, proposed_circuit_model)| {
let proposal_builder = CircuitProposalBuilder::new()
.with_proposal_type(&ProposalType::try_from(
circuit_proposal_model.proposal_type.to_string(),
)?)
.with_circuit_id(&circuit_proposal_model.circuit_id)
.with_circuit_hash(&circuit_proposal_model.circuit_hash)
.with_requester(&PublicKey::from_bytes(
circuit_proposal_model.requester.to_vec(),
))
.with_requester_node_id(&circuit_proposal_model.requester_node_id);
let mut proposed_circuit_builder = ProposedCircuitBuilder::new()
.with_circuit_id(&proposed_circuit_model.circuit_id)
.with_authorization_type(&AuthorizationType::try_from(
proposed_circuit_model.authorization_type,
)?)
.with_persistence(&PersistenceType::try_from(
proposed_circuit_model.persistence,
)?)
.with_durability(&DurabilityType::try_from(
proposed_circuit_model.durability,
)?)
.with_routes(&RouteType::try_from(proposed_circuit_model.routes)?)
.with_circuit_management_type(
&proposed_circuit_model.circuit_management_type,
);
if let Some(application_metadata) =
&proposed_circuit_model.application_metadata
{
proposed_circuit_builder = proposed_circuit_builder
.with_application_metadata(application_metadata);
}
if let Some(comments) = &proposed_circuit_model.comments {
proposed_circuit_builder =
proposed_circuit_builder.with_comments(comments);
}
if let Some(display_name) = &proposed_circuit_model.display_name {
proposed_circuit_builder =
proposed_circuit_builder.with_display_name(display_name);
}
Ok((
event_model.id,
(event_model, proposal_builder, proposed_circuit_builder),
))
},
)
.collect::<Result<HashMap<i64, (_, _, _)>, AdminServiceStoreError>>()?;
let mut proposed_services: HashMap<(i64, String), IndexedServiceBuilder> =
HashMap::new();
for (proposed_service, opt_arg) in admin_event_proposed_service::table
.filter(admin_event_proposed_service::event_id.eq_any(&event_ids))
.left_join(
admin_event_proposed_service_argument::table.on(
admin_event_proposed_service::event_id
.eq(admin_event_proposed_service_argument::event_id)
.and(
admin_event_proposed_service::service_id
.eq(admin_event_proposed_service_argument::service_id),
),
),
)
.select((
admin_event_proposed_service::all_columns,
admin_event_proposed_service_argument::all_columns.nullable(),
))
.load::<(
AdminEventProposedServiceModel,
Option<AdminEventProposedServiceArgumentModel>,
)>(self.conn)?
{
if let Some(arg_model) = opt_arg {
if let Some(indexed_service) = proposed_services.get_mut(&(
proposed_service.event_id,
proposed_service.service_id.to_string(),
)) {
indexed_service.arguments.push(arg_model);
} else {
proposed_services
.entry((
proposed_service.event_id,
proposed_service.service_id.to_string(),
))
.or_insert_with(|| IndexedServiceBuilder {
position: proposed_service.position,
arguments: vec![arg_model],
builder: ProposedServiceBuilder::new()
.with_service_id(&proposed_service.service_id)
.with_service_type(&proposed_service.service_type)
.with_node_id(&proposed_service.node_id),
});
}
}
}
let mut built_proposed_services: HashMap<i64, Vec<ProposedService>> = HashMap::new();
let mut ordered_proposed_services: Vec<((i64, String), IndexedServiceBuilder)> =
proposed_services.into_iter().collect();
ordered_proposed_services
.sort_by_key(|((_, _), indexed_service)| indexed_service.position);
for ((event_id, _), mut indexed_service) in ordered_proposed_services.into_iter() {
indexed_service.arguments.sort_by_key(|arg| arg.position);
indexed_service.builder = indexed_service.builder.with_arguments(
&indexed_service
.arguments
.iter()
.map(|arg_mod| (arg_mod.key.to_string(), arg_mod.value.to_string()))
.collect::<Vec<(String, String)>>(),
);
let proposed_service = indexed_service
.builder
.build()
.map_err(AdminServiceStoreError::InvalidStateError)?;
if let Some(service_list) = built_proposed_services.get_mut(&event_id) {
service_list.push(proposed_service);
} else {
built_proposed_services.insert(event_id, vec![proposed_service]);
}
}
let mut proposed_nodes: HashMap<(i64, String), IndexedNodeBuilder> = HashMap::new();
for (node, endpoint) in admin_event_proposed_node::table
.filter(admin_event_proposed_node::event_id.eq_any(&event_ids))
.inner_join(
admin_event_proposed_node_endpoint::table.on(
admin_event_proposed_node::node_id
.eq(admin_event_proposed_node_endpoint::node_id)
.and(
admin_event_proposed_node_endpoint::event_id
.eq(admin_event_proposed_node::event_id),
),
),
)
.select((
admin_event_proposed_node::all_columns,
admin_event_proposed_node_endpoint::all_columns,
))
.load::<(
AdminEventProposedNodeModel,
AdminEventProposedNodeEndpointModel,
)>(self.conn)?
{
if let Some(proposed_node) =
proposed_nodes.get_mut(&(node.event_id, node.node_id.to_string()))
{
proposed_node.endpoints.push(endpoint);
} else {
let proposed_node = ProposedNodeBuilder::new().with_node_id(&node.node_id);
proposed_nodes.insert(
(node.event_id, node.node_id),
IndexedNodeBuilder {
position: node.position,
endpoints: vec![endpoint],
builder: proposed_node,
},
);
}
}
let mut ordered_proposed_nodes: Vec<((i64, String), IndexedNodeBuilder)> =
proposed_nodes.into_iter().collect();
ordered_proposed_nodes.sort_by_key(|((_, _), indexed_node)| indexed_node.position);
let mut built_proposed_nodes: HashMap<i64, Vec<ProposedNode>> = HashMap::new();
for ((event_id, _), mut proposed_node) in ordered_proposed_nodes.into_iter() {
if let Some(nodes) = built_proposed_nodes.get_mut(&event_id) {
proposed_node
.endpoints
.sort_by_key(|endpoint_mods| endpoint_mods.position);
let endpoints = proposed_node
.endpoints
.iter()
.map(|endpoint_mod| endpoint_mod.endpoint.to_string())
.collect::<Vec<String>>();
nodes.push(
proposed_node
.builder
.with_endpoints(&endpoints)
.build()
.map_err(AdminServiceStoreError::InvalidStateError)?,
)
} else {
proposed_node
.endpoints
.sort_by_key(|endpoint_mods| endpoint_mods.position);
let endpoints = proposed_node
.endpoints
.iter()
.map(|endpoint_mod| endpoint_mod.endpoint.to_string())
.collect::<Vec<String>>();
built_proposed_nodes.insert(
event_id,
vec![proposed_node
.builder
.with_endpoints(&endpoints)
.build()
.map_err(AdminServiceStoreError::InvalidStateError)?],
);
}
}
let mut vote_records: HashMap<i64, Vec<VoteRecord>> = HashMap::new();
for vote in admin_event_vote_record::table
.filter(admin_event_vote_record::event_id.eq_any(&event_ids))
.order(admin_event_vote_record::position)
.load::<AdminEventVoteRecordModel>(self.conn)?
.into_iter()
{
if let Some(votes) = vote_records.get_mut(&vote.event_id) {
votes.push(
VoteRecord::try_from(&vote)
.map_err(AdminServiceStoreError::InvalidStateError)?,
);
} else {
vote_records.insert(
vote.event_id,
vec![VoteRecord::try_from(&vote)
.map_err(AdminServiceStoreError::InvalidStateError)?],
);
}
}
let mut events: Vec<AdminServiceEvent> = Vec::new();
for (event_id, (event_model, mut proposal_builder, mut proposed_circuit_builder)) in
events_map
{
if let Some(services) = built_proposed_services.get(&event_id) {
proposed_circuit_builder = proposed_circuit_builder.with_roster(services);
}
if let Some(nodes) = built_proposed_nodes.get(&event_id) {
proposed_circuit_builder = proposed_circuit_builder.with_members(nodes);
}
if let Some(votes) = vote_records.get(&event_id) {
proposal_builder = proposal_builder.with_votes(votes);
}
let proposal = proposal_builder
.with_circuit(
&proposed_circuit_builder
.build()
.map_err(AdminServiceStoreError::InvalidStateError)?,
)
.build()
.map_err(AdminServiceStoreError::InvalidStateError)?;
events.push(AdminServiceEvent::try_from((event_model, proposal))?)
}
events.sort_by_key(|a| *a.event_id());
Ok(Box::new(events.into_iter()))
})
}
}
struct IndexedNodeBuilder {
position: i32,
endpoints: Vec<AdminEventProposedNodeEndpointModel>,
builder: ProposedNodeBuilder,
}
struct IndexedServiceBuilder {
position: i32,
arguments: Vec<AdminEventProposedServiceArgumentModel>,
builder: ProposedServiceBuilder,
}