mod models;
mod operations;
mod schema;
use std::sync::{Arc, RwLock};
use diesel::r2d2::{ConnectionManager, Pool};
use crate::admin::messages;
use crate::admin::store::{
error::AdminServiceStoreError, AdminServiceStore, Circuit, CircuitNode, CircuitPredicate,
CircuitProposal, Service, ServiceId,
};
use crate::admin::store::{AdminServiceEvent, EventIter};
use crate::store::pool::ConnectionPool;
use operations::add_circuit::AdminServiceStoreAddCircuitOperation as _;
use operations::add_event::AdminServiceStoreAddEventOperation as _;
use operations::add_proposal::AdminServiceStoreAddProposalOperation as _;
use operations::count_circuits::AdminServiceStoreCountCircuitsOperation as _;
use operations::count_proposals::AdminServiceStoreCountProposalsOperation as _;
use operations::get_circuit::AdminServiceStoreFetchCircuitOperation as _;
use operations::get_node::AdminServiceStoreFetchNodeOperation as _;
use operations::get_proposal::AdminServiceStoreFetchProposalOperation as _;
use operations::get_service::AdminServiceStoreFetchServiceOperation as _;
use operations::list_circuits::AdminServiceStoreListCircuitsOperation as _;
use operations::list_events_by_management_type_since::AdminServiceStoreListEventsByManagementTypeSinceOperation as _;
use operations::list_events_since::AdminServiceStoreListEventsSinceOperation as _;
use operations::list_nodes::AdminServiceStoreListNodesOperation as _;
use operations::list_proposals::AdminServiceStoreListProposalsOperation as _;
use operations::list_services::AdminServiceStoreListServicesOperation as _;
use operations::remove_circuit::AdminServiceStoreRemoveCircuitOperation as _;
use operations::remove_proposal::AdminServiceStoreRemoveProposalOperation as _;
use operations::update_circuit::AdminServiceStoreUpdateCircuitOperation as _;
use operations::update_proposal::AdminServiceStoreUpdateProposalOperation as _;
use operations::upgrade::AdminServiceStoreUpgradeProposalToCircuitOperation as _;
use operations::AdminServiceStoreOperations;
pub struct DieselAdminServiceStore<C: diesel::Connection + 'static> {
connection_pool: ConnectionPool<C>,
}
impl<C: diesel::Connection> DieselAdminServiceStore<C> {
pub fn new(connection_pool: Pool<ConnectionManager<C>>) -> Self {
DieselAdminServiceStore {
connection_pool: connection_pool.into(),
}
}
pub fn new_with_write_exclusivity(
connection_pool: Arc<RwLock<Pool<ConnectionManager<C>>>>,
) -> Self {
Self {
connection_pool: connection_pool.into(),
}
}
}
#[cfg(feature = "sqlite")]
impl Clone for DieselAdminServiceStore<diesel::sqlite::SqliteConnection> {
fn clone(&self) -> Self {
Self {
connection_pool: self.connection_pool.clone(),
}
}
}
#[cfg(feature = "postgres")]
impl Clone for DieselAdminServiceStore<diesel::pg::PgConnection> {
fn clone(&self) -> Self {
Self {
connection_pool: self.connection_pool.clone(),
}
}
}
#[cfg(feature = "postgres")]
impl AdminServiceStore for DieselAdminServiceStore<diesel::pg::PgConnection> {
fn add_proposal(&self, proposal: CircuitProposal) -> Result<(), AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).add_proposal(proposal))
}
fn update_proposal(&self, proposal: CircuitProposal) -> Result<(), AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).update_proposal(proposal))
}
fn remove_proposal(&self, proposal_id: &str) -> Result<(), AdminServiceStoreError> {
self.connection_pool.execute_write(|conn| {
AdminServiceStoreOperations::new(conn).remove_proposal(proposal_id)
})
}
fn get_proposal(
&self,
proposal_id: &str,
) -> Result<Option<CircuitProposal>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).get_proposal(proposal_id))
}
fn list_proposals(
&self,
predicates: &[CircuitPredicate],
) -> Result<Box<dyn ExactSizeIterator<Item = CircuitProposal>>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_proposals(predicates))
}
fn count_proposals(
&self,
predicates: &[CircuitPredicate],
) -> Result<u32, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).count_proposals(predicates))
}
fn add_circuit(
&self,
circuit: Circuit,
nodes: Vec<CircuitNode>,
) -> Result<(), AdminServiceStoreError> {
self.connection_pool.execute_write(|conn| {
AdminServiceStoreOperations::new(conn).add_circuit(circuit, nodes)
})
}
fn update_circuit(&self, circuit: Circuit) -> Result<(), AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).update_circuit(circuit))
}
fn remove_circuit(&self, circuit_id: &str) -> Result<(), AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).remove_circuit(circuit_id))
}
fn get_circuit(&self, circuit_id: &str) -> Result<Option<Circuit>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).get_circuit(circuit_id))
}
fn list_circuits(
&self,
predicates: &[CircuitPredicate],
) -> Result<Box<dyn ExactSizeIterator<Item = Circuit>>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_circuits(predicates))
}
fn count_circuits(
&self,
predicates: &[CircuitPredicate],
) -> Result<u32, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).count_circuits(predicates))
}
fn upgrade_proposal_to_circuit(&self, circuit_id: &str) -> Result<(), AdminServiceStoreError> {
self.connection_pool.execute_write(|conn| {
AdminServiceStoreOperations::new(conn).upgrade_proposal_to_circuit(circuit_id)
})
}
fn get_node(&self, node_id: &str) -> Result<Option<CircuitNode>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).get_node(node_id))
}
fn list_nodes(
&self,
) -> Result<Box<dyn ExactSizeIterator<Item = CircuitNode>>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_nodes())
}
fn get_service(
&self,
service_id: &ServiceId,
) -> Result<Option<Service>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).get_service(service_id))
}
fn list_services(
&self,
circuit_id: &str,
) -> Result<Box<dyn ExactSizeIterator<Item = Service>>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_services(circuit_id))
}
fn add_event(
&self,
event: messages::AdminServiceEvent,
) -> Result<AdminServiceEvent, AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).add_event(event))
}
fn list_events_since(&self, start: i64) -> Result<EventIter, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_events_since(start))
}
fn list_events_by_management_type_since(
&self,
management_type: String,
start: i64,
) -> Result<EventIter, AdminServiceStoreError> {
self.connection_pool.execute_read(|conn| {
AdminServiceStoreOperations::new(conn)
.list_events_by_management_type_since(management_type, start)
})
}
fn clone_boxed(&self) -> Box<dyn AdminServiceStore> {
Box::new(self.clone())
}
}
#[cfg(feature = "sqlite")]
impl AdminServiceStore for DieselAdminServiceStore<diesel::sqlite::SqliteConnection> {
fn add_proposal(&self, proposal: CircuitProposal) -> Result<(), AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).add_proposal(proposal))
}
fn update_proposal(&self, proposal: CircuitProposal) -> Result<(), AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).update_proposal(proposal))
}
fn remove_proposal(&self, proposal_id: &str) -> Result<(), AdminServiceStoreError> {
self.connection_pool.execute_write(|conn| {
AdminServiceStoreOperations::new(conn).remove_proposal(proposal_id)
})
}
fn get_proposal(
&self,
proposal_id: &str,
) -> Result<Option<CircuitProposal>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).get_proposal(proposal_id))
}
fn list_proposals(
&self,
predicates: &[CircuitPredicate],
) -> Result<Box<dyn ExactSizeIterator<Item = CircuitProposal>>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_proposals(predicates))
}
fn count_proposals(
&self,
predicates: &[CircuitPredicate],
) -> Result<u32, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).count_proposals(predicates))
}
fn add_circuit(
&self,
circuit: Circuit,
nodes: Vec<CircuitNode>,
) -> Result<(), AdminServiceStoreError> {
self.connection_pool.execute_write(|conn| {
AdminServiceStoreOperations::new(conn).add_circuit(circuit, nodes)
})
}
fn update_circuit(&self, circuit: Circuit) -> Result<(), AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).update_circuit(circuit))
}
fn remove_circuit(&self, circuit_id: &str) -> Result<(), AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).remove_circuit(circuit_id))
}
fn get_circuit(&self, circuit_id: &str) -> Result<Option<Circuit>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).get_circuit(circuit_id))
}
fn list_circuits(
&self,
predicates: &[CircuitPredicate],
) -> Result<Box<dyn ExactSizeIterator<Item = Circuit>>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_circuits(predicates))
}
fn count_circuits(
&self,
predicates: &[CircuitPredicate],
) -> Result<u32, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).count_circuits(predicates))
}
fn upgrade_proposal_to_circuit(&self, circuit_id: &str) -> Result<(), AdminServiceStoreError> {
self.connection_pool.execute_write(|conn| {
AdminServiceStoreOperations::new(conn).upgrade_proposal_to_circuit(circuit_id)
})
}
fn get_node(&self, node_id: &str) -> Result<Option<CircuitNode>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).get_node(node_id))
}
fn list_nodes(
&self,
) -> Result<Box<dyn ExactSizeIterator<Item = CircuitNode>>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_nodes())
}
fn get_service(
&self,
service_id: &ServiceId,
) -> Result<Option<Service>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).get_service(service_id))
}
fn list_services(
&self,
circuit_id: &str,
) -> Result<Box<dyn ExactSizeIterator<Item = Service>>, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_services(circuit_id))
}
fn add_event(
&self,
event: messages::AdminServiceEvent,
) -> Result<AdminServiceEvent, AdminServiceStoreError> {
self.connection_pool
.execute_write(|conn| AdminServiceStoreOperations::new(conn).add_event(event))
}
fn list_events_since(&self, start: i64) -> Result<EventIter, AdminServiceStoreError> {
self.connection_pool
.execute_read(|conn| AdminServiceStoreOperations::new(conn).list_events_since(start))
}
fn list_events_by_management_type_since(
&self,
management_type: String,
start: i64,
) -> Result<EventIter, AdminServiceStoreError> {
self.connection_pool.execute_read(|conn| {
AdminServiceStoreOperations::new(conn)
.list_events_by_management_type_since(management_type, start)
})
}
fn clone_boxed(&self) -> Box<dyn AdminServiceStore> {
Box::new(self.clone())
}
}
#[cfg(all(test, feature = "sqlite"))]
pub mod tests {
use super::*;
use crate::admin::store::{
CircuitBuilder, CircuitNodeBuilder, CircuitProposal, CircuitProposalBuilder, CircuitStatus,
ProposalType, ProposedCircuitBuilder, ProposedNodeBuilder, ProposedServiceBuilder,
ServiceBuilder, Vote, VoteRecordBuilder,
};
use crate::admin::store::{AdminServiceEventBuilder, EventType};
use crate::hex::parse_hex;
use crate::migrations::run_sqlite_migrations;
use crate::public_key::PublicKey;
use diesel::{
r2d2::{ConnectionManager, Pool},
sqlite::SqliteConnection,
};
#[test]
fn test_sqlite_migrations() {
create_connection_pool_and_migrate();
}
#[test]
fn test_add_get_proposals() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let proposal = create_proposal();
store
.add_proposal(proposal.clone())
.expect("Unable to add circuit proposal");
let fetched_proposal = store
.get_proposal("WBKLF-BBBBB")
.expect("Unable to get proposal")
.expect("Got None when expecting proposal");
assert_eq!(proposal, fetched_proposal);
}
#[test]
fn test_list_proposals() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let proposal = create_proposal();
store
.add_proposal(proposal.clone())
.expect("Unable to add circuit proposal");
let mut proposals = store
.list_proposals(&vec![])
.expect("Unable to list proposals");
assert_eq!(proposals.next(), Some(proposal.clone()));
assert_eq!(proposals.next(), None);
let mut proposals = store
.list_proposals(&vec![CircuitPredicate::ManagementTypeEq(
"gameroom".to_string(),
)])
.expect("Unable to list proposals with management type predicate");
assert_eq!(proposals.next(), Some(proposal.clone()));
assert_eq!(proposals.next(), None);
let mut proposals = store
.list_proposals(&vec![CircuitPredicate::ManagementTypeEq(
"arcade".to_string(),
)])
.expect("Unable to list proposals with management type predicate");
assert_eq!(proposals.next(), None);
let extra_proposal = create_extra_proposal();
store
.add_proposal(extra_proposal.clone())
.expect("Unable to add circuit proposal");
let mut proposals = store
.list_proposals(&vec![CircuitPredicate::MembersInclude(vec![
"gumbo-node-000".to_string(),
])])
.expect("Unable to list proposals with members include predicate");
assert_eq!(proposals.next(), Some(extra_proposal));
assert_eq!(proposals.next(), None);
let proposals = store
.list_proposals(&vec![])
.expect("Unable to list proposals with members include predicate");
assert_eq!(proposals.len(), 2);
}
#[test]
fn test_count_proposals() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let proposal = create_proposal();
store
.add_proposal(proposal.clone())
.expect("Unable to add circuit proposal");
assert_eq!(
store
.count_proposals(&vec![])
.expect("Unable to list proposals"),
1,
);
assert_eq!(
store
.count_proposals(&vec![CircuitPredicate::ManagementTypeEq(
"gameroom".to_string(),
)])
.expect("Unable to list proposals"),
1,
);
let extra_proposal = create_extra_proposal();
store
.add_proposal(extra_proposal.clone())
.expect("Unable to add circuit proposal");
assert_eq!(
store
.count_proposals(&vec![CircuitPredicate::MembersInclude(vec![
"gumbo-node-000".to_string(),
])])
.expect("Unable to list proposals"),
1,
);
assert_eq!(
store
.count_proposals(&vec![CircuitPredicate::ManagementTypeEq(
"arcade".to_string(),
)])
.expect("Unable to list proposals"),
0,
);
}
#[test]
fn test_remove_proposals() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let proposal = create_proposal();
store
.add_proposal(proposal.clone())
.expect("Unable to add circuit proposal");
let fetched_proposal = store
.get_proposal("WBKLF-BBBBB")
.expect("Unable to get proposal")
.expect("Got None when expecting proposal");
assert_eq!(proposal, fetched_proposal);
store
.remove_proposal("WBKLF-BBBBB")
.expect("Unable to add circuit proposal");
let fetched_proposal = store
.get_proposal("WBKLF-BBBBB")
.expect("Unable to get proposal");
assert_eq!(None, fetched_proposal);
}
#[test]
fn test_update_proposals() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let proposal = create_proposal();
store
.add_proposal(proposal.clone())
.expect("Unable to add circuit proposal");
let fetched_proposal = store
.get_proposal("WBKLF-BBBBB")
.expect("Unable to get proposal")
.expect("Got None when expecting proposal");
assert_eq!(proposal, fetched_proposal);
let updated_proposal = proposal
.builder()
.with_votes(&vec![VoteRecordBuilder::new()
.with_public_key(&PublicKey::from_bytes(
parse_hex("035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550")
.unwrap(),
))
.with_vote(&Vote::Accept)
.with_voter_node_id("bubba-node-000")
.build()
.expect("Unable to build vote record")])
.build()
.expect("Unable to build updated proposal");
store
.update_proposal(updated_proposal.clone())
.expect("Unable to update proposal");
let fetched_proposal = store
.get_proposal("WBKLF-BBBBB")
.expect("Unable to get proposal")
.expect("Got None when expecting proposal");
assert_eq!(updated_proposal, fetched_proposal);
}
#[test]
fn test_upgrade_proposals() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let proposal = create_proposal();
store
.add_proposal(proposal.clone())
.expect("Unable to add circuit proposal");
let fetched_proposal = store
.get_proposal("WBKLF-BBBBB")
.expect("Unable to get proposal")
.expect("Got None when expecting proposal");
assert_eq!(proposal, fetched_proposal);
store
.upgrade_proposal_to_circuit("WBKLF-BBBBB")
.expect("Unable to add circuit proposal");
assert!(store
.get_proposal("WBKLF-BBBBB")
.expect("Unable to get proposal")
.is_none());
let fetched_circuit = store
.get_circuit("WBKLF-BBBBB")
.expect("Unable to get circuit")
.expect("Got None when expecting circuit");
assert_eq!(
create_circuit_from_proposal("WBKLF-BBBBB", CircuitStatus::Active),
fetched_circuit
);
}
#[test]
fn test_add_get_circuit_and_nodes() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let circuit = create_circuit("WBKLF-BBBBB", CircuitStatus::Active);
let nodes = create_nodes();
store
.add_circuit(circuit.clone(), nodes)
.expect("Unable to add circuit");
let fetched_circuit = store
.get_circuit("WBKLF-BBBBB")
.expect("Unable to get circuit")
.expect("Got None when expecting circuit");
let fetched_node = store
.get_node("bubba-node-000")
.expect("Unable to get node")
.expect("Got None when expecting node");
assert_eq!(circuit, fetched_circuit);
assert_eq!(
fetched_node,
CircuitNodeBuilder::default()
.with_node_id("bubba-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-bubba:8044".into()])
.build()
.expect("Unable to build node"),
)
}
#[test]
fn test_list_circuits() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let circuit = create_circuit("WBKLF-BBBBB", CircuitStatus::Active);
let nodes = create_nodes();
let extra_circuit = create_extra_circuit("WBKLF-CCCCC");
let extra_nodes = create_extra_nodes();
store
.add_circuit(circuit.clone(), nodes.clone())
.expect("Unable to add circuit");
let mut circuits = store
.list_circuits(&vec![])
.expect("Unable to list circuits");
assert_eq!(circuits.next(), Some(circuit.clone()));
assert_eq!(circuits.next(), None);
let mut circuits = store
.list_circuits(&vec![CircuitPredicate::ManagementTypeEq(
"gameroom".to_string(),
)])
.expect("Unable to list circuits with management type predicate");
assert_eq!(circuits.next(), Some(circuit.clone()));
assert_eq!(circuits.next(), None);
let mut circuits = store
.list_circuits(&vec![CircuitPredicate::ManagementTypeEq(
"arcade".to_string(),
)])
.expect("Unable to list circuits with management type predicate");
assert_eq!(circuits.next(), None);
store
.add_circuit(extra_circuit.clone(), extra_nodes)
.expect("Unable to add circuit");
let mut circuits = store
.list_circuits(&vec![CircuitPredicate::MembersInclude(vec![
"gumbo-node-000".to_string(),
])])
.expect("Unable to list circuits with members include predicate");
assert_eq!(circuits.next(), Some(extra_circuit.clone()));
assert_eq!(circuits.next(), None);
let disbanded_circuit = create_circuit("WBKLF-DDDDD", CircuitStatus::Disbanded);
store
.add_circuit(disbanded_circuit.clone(), nodes.clone())
.expect("Unable to add disbanded circuit");
let mut circuits = store
.list_circuits(&vec![])
.expect("Unable to list circuits");
assert_eq!(circuits.next(), Some(extra_circuit.clone()));
assert_eq!(circuits.next(), Some(circuit.clone()));
assert_eq!(circuits.next(), None);
let mut circuits = store
.list_circuits(&vec![CircuitPredicate::CircuitStatus(
CircuitStatus::Disbanded,
)])
.expect("Unable to list circuits with `CircuitStatus` predicate");
assert_eq!(circuits.next(), Some(disbanded_circuit.clone()));
assert_eq!(circuits.next(), None);
let mut circuits = store
.list_circuits(&vec![CircuitPredicate::CircuitStatus(
CircuitStatus::Abandoned,
)])
.expect("Unable to list circuits with `CircuitStatus` predicate");
assert_eq!(circuits.next(), None);
let circuits = store
.list_circuits(&vec![])
.expect("Unable to list circuits");
assert_eq!(circuits.len(), 2);
}
#[test]
fn test_count_circuits() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let circuit = create_circuit("WBKLF-BBBBB", CircuitStatus::Active);
let nodes = create_nodes();
let extra_circuit = create_extra_circuit("WBKLF-CCCCC");
let extra_nodes = create_extra_nodes();
store
.add_circuit(circuit.clone(), nodes.clone())
.expect("Unable to add circuit");
assert_eq!(
store
.count_circuits(&vec![])
.expect("Unable to list circuits"),
1
);
assert_eq!(
store
.count_circuits(&vec![CircuitPredicate::ManagementTypeEq(
"gameroom".to_string(),
)])
.expect("Unable to list circuits"),
1
);
assert_eq!(
store
.count_circuits(&vec![CircuitPredicate::ManagementTypeEq(
"arcade".to_string(),
)])
.expect("Unable to list circuits"),
0
);
store
.add_circuit(extra_circuit.clone(), extra_nodes)
.expect("Unable to add circuit");
assert_eq!(
store
.count_circuits(&vec![CircuitPredicate::MembersInclude(vec![
"gumbo-node-000".to_string(),
])])
.expect("Unable to list circuits"),
1
);
let disbanded_circuit = create_circuit("WBKLF-DDDDD", CircuitStatus::Disbanded);
store
.add_circuit(disbanded_circuit.clone(), nodes.clone())
.expect("Unable to add disbanded circuit");
assert_eq!(
store
.count_circuits(&vec![])
.expect("Unable to list circuits"),
2
);
assert_eq!(
store
.count_circuits(&vec![CircuitPredicate::CircuitStatus(
CircuitStatus::Disbanded,
)])
.expect("Unable to list circuits"),
1
);
assert_eq!(
store
.count_circuits(&vec![CircuitPredicate::CircuitStatus(
CircuitStatus::Abandoned,
)])
.expect("Unable to list circuits"),
0
);
}
#[test]
fn test_remove_circuits() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let circuit = create_circuit("WBKLF-BBBBB", CircuitStatus::Active);
let nodes = create_nodes();
store
.add_circuit(circuit.clone(), nodes)
.expect("Unable to add circuit");
let fetched_circuit = store
.get_circuit("WBKLF-BBBBB")
.expect("Unable to get circuit")
.expect("Got None when expecting circuit");
assert_eq!(circuit, fetched_circuit);
store
.remove_circuit("WBKLF-BBBBB")
.expect("Unable to add circuit");
let fetched_circuit = store
.get_circuit("WBKLF-BBBBB")
.expect("Unable to get circuit");
assert_eq!(None, fetched_circuit);
}
#[test]
fn test_get_service() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let circuit = create_circuit("WBKLF-BBBBB", CircuitStatus::Active);
let nodes = create_nodes();
store
.add_circuit(circuit.clone(), nodes)
.expect("Unable to add circuit");
let fetched_circuit = store
.get_circuit("WBKLF-BBBBB")
.expect("Unable to get circuit")
.expect("Got None when expecting circuit");
assert_eq!(circuit, fetched_circuit);
let service_id = ServiceId::new("WBKLF-BBBBB".to_string(), "a000".to_string());
let fetched_service = store
.get_service(&service_id)
.expect("Unable to get service")
.expect("Got None when expecting service");
assert_eq!(fetched_circuit.roster()[0], fetched_service);
}
#[test]
fn test_list_service() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let circuit = create_circuit("WBKLF-BBBBB", CircuitStatus::Active);
let nodes = create_nodes();
store
.add_circuit(circuit.clone(), nodes)
.expect("Unable to add circuit");
let fetched_circuit = store
.get_circuit("WBKLF-BBBBB")
.expect("Unable to get circuit")
.expect("Got None when expecting circuit");
assert_eq!(circuit, fetched_circuit);
let mut services = store
.list_services("WBKLF-BBBBB")
.expect("Unable to get services");
assert!(fetched_circuit
.roster()
.contains(&services.next().expect("Unable to get service")));
assert!(fetched_circuit
.roster()
.contains(&services.next().expect("Unable to get service")));
assert_eq!(None, services.next());
}
#[test]
fn test_list_nodes() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let circuit = create_circuit("WBKLF-BBBBB", CircuitStatus::Active);
let nodes = create_nodes();
store
.add_circuit(circuit.clone(), nodes)
.expect("Unable to add circuit");
let fetched_circuit = store
.get_circuit("WBKLF-BBBBB")
.expect("Unable to get circuit")
.expect("Got None when expecting circuit");
assert_eq!(circuit, fetched_circuit);
let mut nodes = store.list_nodes().expect("Unable to get services");
assert!(fetched_circuit
.members()
.contains(&nodes.next().expect("Unable to get service")));
assert!(fetched_circuit
.members()
.contains(&nodes.next().expect("Unable to get service")));
assert!(nodes.next().is_none());
}
#[test]
fn test_add_list_one_event() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let event = create_proposal_submitted_messages_event("test");
store.add_event(event).expect("Unable to add event");
let events: Vec<AdminServiceEvent> = store
.list_events_since(0)
.expect("Unable to get events from store")
.collect();
assert_eq!(events.len(), 1);
assert_eq!(events, vec![create_proposal_submitted_event(1, "test")],);
}
#[test]
fn test_list_since_multiple_events() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let event_1 = create_proposal_submitted_messages_event("test");
store.add_event(event_1).expect("Unable to add event");
let event_2 = create_circuit_ready_messages_event("test");
store.add_event(event_2).expect("Unable to add event");
let events: Vec<AdminServiceEvent> = store
.list_events_since(0)
.expect("Unable to get events from store")
.collect();
assert_eq!(events.len(), 2);
assert_eq!(
events,
vec![
create_proposal_submitted_event(1, "test"),
create_circuit_ready_event(2, "test")
],
);
}
#[test]
fn test_list_since() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let event_1 = create_proposal_submitted_messages_event("test");
store.add_event(event_1).expect("Unable to add event");
let event_2 = create_circuit_ready_messages_event("test");
store.add_event(event_2).expect("Unable to add event");
let event_3 = create_proposal_vote_messages_event("test");
store.add_event(event_3).expect("Unable to add event");
let events: Vec<AdminServiceEvent> = store
.list_events_since(1)
.expect("Unable to get events from store")
.collect();
assert_eq!(events.len(), 2);
assert_eq!(
events,
vec![
create_circuit_ready_event(2, "test"),
create_proposal_vote_event(3, "test")
],
);
}
#[test]
fn test_list_one_event_by_management_type() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let event = create_proposal_submitted_messages_event("test");
store.add_event(event).expect("Unable to add event");
let event_2 = create_circuit_ready_messages_event("not-test");
store.add_event(event_2).expect("Unable to add event");
let event_3 = create_proposal_vote_messages_event("test");
store.add_event(event_3).expect("Unable to add event");
let events: Vec<AdminServiceEvent> = store
.list_events_by_management_type_since("not-test".to_string(), 0)
.expect("Unable to get events from store")
.collect();
assert_eq!(events.len(), 1);
assert_eq!(events, vec![create_circuit_ready_event(2, "not-test")],);
}
#[test]
fn test_list_event_by_management_type_since() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let event = create_proposal_submitted_messages_event("test");
store.add_event(event).expect("Unable to add event");
let event_2 = create_circuit_ready_messages_event("not-test");
store.add_event(event_2).expect("Unable to add event");
let event_3 = create_proposal_vote_messages_event("test");
store.add_event(event_3).expect("Unable to add event");
let events: Vec<AdminServiceEvent> = store
.list_events_by_management_type_since("not-test".to_string(), 1)
.expect("Unable to get events from store")
.collect();
assert_eq!(events.len(), 1);
assert_eq!(events, vec![create_circuit_ready_event(2, "not-test")],);
}
#[test]
fn test_list_multiple_events_by_management_type() {
let pool = create_connection_pool_and_migrate();
let store = DieselAdminServiceStore::new(pool);
let event = create_proposal_submitted_messages_event("test");
store.add_event(event).expect("Unable to add event");
let event_2 = create_circuit_ready_messages_event("not-test");
store.add_event(event_2).expect("Unable to add event");
let event_3 = create_proposal_vote_messages_event("test");
store.add_event(event_3).expect("Unable to add event");
let events: Vec<AdminServiceEvent> = store
.list_events_by_management_type_since("test".to_string(), 0)
.expect("Unable to get events from store")
.collect();
assert_eq!(events.len(), 2);
assert_eq!(
events,
vec![
create_proposal_submitted_event(1, "test"),
create_proposal_vote_event(3, "test")
],
);
}
fn create_connection_pool_and_migrate() -> Pool<ConnectionManager<SqliteConnection>> {
let connection_manager = ConnectionManager::<SqliteConnection>::new(":memory:");
let pool = Pool::builder()
.max_size(1)
.build(connection_manager)
.expect("Failed to build connection pool");
run_sqlite_migrations(&*pool.get().expect("Failed to get connection for migrations"))
.expect("Failed to run migrations");
pool
}
fn create_proposal() -> CircuitProposal {
CircuitProposalBuilder::default()
.with_proposal_type(&ProposalType::Create)
.with_circuit_id("WBKLF-BBBBB")
.with_circuit_hash(
"7ddc426972710adc0b2ecd49e89a9dd805fb9206bf516079724c887bedbcdf1d")
.with_circuit(
&ProposedCircuitBuilder::default()
.with_circuit_id("WBKLF-BBBBB")
.with_roster(&vec![
ProposedServiceBuilder::default()
.with_service_id("a000")
.with_service_type("scabbard")
.with_node_id(&"acme-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a001\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build().expect("Unable to build service"),
ProposedServiceBuilder::default()
.with_service_id("a001")
.with_service_type("scabbard")
.with_node_id(&"bubba-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a000\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build().expect("Unable to build service")
])
.with_members(
&vec![
ProposedNodeBuilder::default()
.with_node_id("bubba-node-000".into())
.with_endpoints(
&vec!["tcps://splinterd-node-bubba:8044".into(),
"tcps://splinterd-node-bubba-2:8044".into()])
.build().expect("Unable to build node"),
ProposedNodeBuilder::default()
.with_node_id("acme-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-acme:8044".into()])
.build().expect("Unable to build node"),
]
)
.with_circuit_version(3)
.with_application_metadata(b"test")
.with_comments("This is a test")
.with_circuit_management_type("gameroom")
.with_display_name("test_display")
.build()
.expect("Unable to build circuit")
)
.with_requester(
&PublicKey::from_bytes(parse_hex(
"0283a14e0a17cb7f665311e9b5560f4cde2b502f17e2d03223e15d90d9318d7482").unwrap()))
.with_requester_node_id("acme-node-000")
.with_votes(&vec![VoteRecordBuilder::new()
.with_public_key(
&PublicKey::from_bytes(parse_hex(
"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550",
)
.unwrap()),
)
.with_vote(&Vote::Accept)
.with_voter_node_id("bubba-node-000")
.build()
.expect("Unable to build vote record"),
VoteRecordBuilder::new()
.with_public_key(
&PublicKey::from_bytes(parse_hex(
"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550",
)
.unwrap()),
)
.with_vote(&Vote::Accept)
.with_voter_node_id("bubba-node-002")
.build()
.expect("Unable to build vote record")]
)
.build().expect("Unable to build proposals")
}
fn create_extra_proposal() -> CircuitProposal {
CircuitProposalBuilder::default()
.with_proposal_type(&ProposalType::Create)
.with_circuit_id("WBKLF-AAAAA")
.with_circuit_hash(
"7ddc426972710adc0b2ecd49e89a9dd805fb9206bf516079724c887bedbcdf1d")
.with_circuit(
&ProposedCircuitBuilder::default()
.with_circuit_id("WBKLF-AAAAA")
.with_roster(&vec![
ProposedServiceBuilder::default()
.with_service_id("a000")
.with_service_type("scabbard")
.with_node_id(&"acme-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a001\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build().expect("Unable to build service"),
ProposedServiceBuilder::default()
.with_service_id("a001")
.with_service_type("scabbard")
.with_node_id(&"gumbo-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a000\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build().expect("Unable to build service")
])
.with_members(
&vec![
ProposedNodeBuilder::default()
.with_node_id("gumbo-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-gumbo:8044".into()])
.build().expect("Unable to build node"),
ProposedNodeBuilder::default()
.with_node_id("acme-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-acme:8044".into()])
.build().expect("Unable to build node"),
]
)
.with_circuit_management_type("gameroom")
.with_circuit_status(&CircuitStatus::Active)
.build().expect("Unable to build circuit")
)
.with_requester(
&PublicKey::from_bytes(parse_hex(
"0283a14e0a17cb7f665311e9b5560f4cde2b502f17e2d03223e15d90d9318d7482").unwrap()))
.with_requester_node_id("acme-node-000")
.build().expect("Unable to build proposals")
}
fn create_circuit(circuit_id: &str, status: CircuitStatus) -> Circuit {
let nodes = create_nodes();
CircuitBuilder::default()
.with_circuit_id(circuit_id)
.with_roster(&vec![
ServiceBuilder::default()
.with_service_id("a000")
.with_service_type("scabbard")
.with_node_id("acme-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a001\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build()
.expect("Unable to build service"),
ServiceBuilder::default()
.with_service_id("a001")
.with_service_type("scabbard")
.with_node_id("bubba-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a000\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build()
.expect("Unable to build service"),
])
.with_members(&nodes)
.with_circuit_management_type("gameroom")
.with_display_name("test_display")
.with_circuit_version(3)
.with_circuit_status(&status)
.build()
.expect("Unable to build circuit")
}
fn create_circuit_from_proposal(circuit_id: &str, status: CircuitStatus) -> Circuit {
CircuitBuilder::default()
.with_circuit_id(circuit_id)
.with_roster(&vec![
ServiceBuilder::default()
.with_service_id("a000")
.with_service_type("scabbard")
.with_node_id("acme-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a001\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build()
.expect("Unable to build service"),
ServiceBuilder::default()
.with_service_id("a001")
.with_service_type("scabbard")
.with_node_id("bubba-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a000\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build()
.expect("Unable to build service"),
])
.with_members(
&vec![
CircuitNodeBuilder::default()
.with_node_id("bubba-node-000".into())
.with_endpoints(
&vec!["tcps://splinterd-node-bubba:8044".into(),
"tcps://splinterd-node-bubba-2:8044".into()])
.build().expect("Unable to build node"),
CircuitNodeBuilder::default()
.with_node_id("acme-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-acme:8044".into()])
.build().expect("Unable to build node"),
]
)
.with_circuit_management_type("gameroom")
.with_display_name("test_display")
.with_circuit_version(3)
.with_circuit_status(&status)
.build()
.expect("Unable to build circuit")
}
fn create_extra_circuit(circuit_id: &str) -> Circuit {
let nodes = create_extra_nodes();
CircuitBuilder::default()
.with_circuit_id(circuit_id)
.with_roster(&vec![
ServiceBuilder::default()
.with_service_id("a000")
.with_service_type("scabbard")
.with_node_id("acme-node-000")
.with_arguments(&vec![
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]"
.into()),
("peer_services".into(), "[\"a001\"]".into()),
])
.build()
.expect("Unable to build service"),
ServiceBuilder::default()
.with_service_id("a001")
.with_service_type("scabbard")
.with_node_id("gumbo-node-000")
.with_arguments(&vec![(
"admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]"
.into()
),(
"peer_services".into(), "[\"a000\"]".into()
)])
.build()
.expect("Unable to build service"),
])
.with_members(&nodes)
.with_circuit_management_type("other")
.build()
.expect("Unable to build circuit")
}
fn create_messages_proposal(management_type: &str) -> CircuitProposal {
CircuitProposalBuilder::default()
.with_proposal_type(&ProposalType::Create)
.with_circuit_id("WBKLF-BBBBB")
.with_circuit_hash(
"7ddc426972710adc0b2ecd49e89a9dd805fb9206bf516079724c887bedbcdf1d")
.with_circuit(
&ProposedCircuitBuilder::default()
.with_circuit_id("WBKLF-BBBBB")
.with_roster(&vec![
ProposedServiceBuilder::default()
.with_service_id("a000")
.with_service_type("scabbard")
.with_node_id(&"acme-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a001\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build().expect("Unable to build service"),
ProposedServiceBuilder::default()
.with_service_id("a001")
.with_service_type("scabbard")
.with_node_id(&"bubba-node-000")
.with_arguments(&vec![
("peer_services".into(), "[\"a000\"]".into()),
("admin_keys".into(),
"[\"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550\"]".into())
])
.build().expect("Unable to build service")
])
.with_members(
&vec![
ProposedNodeBuilder::default()
.with_node_id("bubba-node-000".into())
.with_endpoints(
&vec!["tcps://splinterd-node-bubba:8044".into(),
"tcps://splinterd-node-bubba-2:8044".into()])
.build().expect("Unable to build node"),
ProposedNodeBuilder::default()
.with_node_id("acme-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-acme:8044".into()])
.build().expect("Unable to build node"),
]
)
.with_circuit_version(1)
.with_application_metadata(b"test")
.with_comments("This is a test")
.with_circuit_management_type(management_type)
.with_display_name("test_display")
.build()
.expect("Unable to build circuit")
)
.with_requester(
&PublicKey::from_bytes(parse_hex(
"0283a14e0a17cb7f665311e9b5560f4cde2b502f17e2d03223e15d90d9318d7482").unwrap()))
.with_requester_node_id("acme-node-000")
.with_votes(&vec![VoteRecordBuilder::new()
.with_public_key(
&PublicKey::from_bytes(parse_hex(
"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550",
)
.unwrap()),
)
.with_vote(&Vote::Accept)
.with_voter_node_id("bubba-node-000")
.build()
.expect("Unable to build vote record"),
VoteRecordBuilder::new()
.with_public_key(
&PublicKey::from_bytes(parse_hex(
"035724d11cae47c8907f8bfdf510488f49df8494ff81b63825bad923733c4ac550",
)
.unwrap()),
)
.with_vote(&Vote::Accept)
.with_voter_node_id("bubba-node-002")
.build()
.expect("Unable to build vote record")]
)
.build().expect("Unable to build proposals")
}
fn create_nodes() -> Vec<CircuitNode> {
vec![
CircuitNodeBuilder::default()
.with_node_id("bubba-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-bubba:8044".into()])
.build()
.expect("Unable to build node"),
CircuitNodeBuilder::default()
.with_node_id("acme-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-acme:8044".into()])
.build()
.expect("Unable to build node"),
]
}
fn create_extra_nodes() -> Vec<CircuitNode> {
vec![
CircuitNodeBuilder::default()
.with_node_id("gumbo-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-gumbo:8044".into()])
.build()
.expect("Unable to build node"),
CircuitNodeBuilder::default()
.with_node_id("acme-node-000".into())
.with_endpoints(&vec!["tcps://splinterd-node-acme:8044".into()])
.build()
.expect("Unable to build node"),
]
}
fn create_proposal_submitted_event(event_id: i64, management_type: &str) -> AdminServiceEvent {
AdminServiceEventBuilder::new()
.with_event_id(event_id)
.with_event_type(&EventType::ProposalSubmitted)
.with_proposal(&create_messages_proposal(management_type))
.build()
.expect("Unable to build AdminServiceEvent")
}
fn create_proposal_submitted_messages_event(
management_type: &str,
) -> messages::AdminServiceEvent {
messages::AdminServiceEvent::ProposalSubmitted(messages::CircuitProposal::from(
create_messages_proposal(management_type),
))
}
fn create_circuit_ready_event(event_id: i64, management_type: &str) -> AdminServiceEvent {
AdminServiceEventBuilder::new()
.with_event_id(event_id)
.with_event_type(&EventType::CircuitReady)
.with_proposal(&create_messages_proposal(management_type))
.build()
.expect("Unable to build AdminServiceEvent")
}
fn create_circuit_ready_messages_event(management_type: &str) -> messages::AdminServiceEvent {
messages::AdminServiceEvent::CircuitReady(messages::CircuitProposal::from(
create_messages_proposal(management_type),
))
}
fn create_proposal_vote_event(event_id: i64, management_type: &str) -> AdminServiceEvent {
let requester =
&parse_hex("0283a14e0a17cb7f665311e9b5560f4cde2b502f17e2d03223e15d90d9318d7482")
.unwrap();
AdminServiceEventBuilder::new()
.with_event_id(event_id)
.with_event_type(&EventType::ProposalVote {
requester: requester.to_vec(),
})
.with_proposal(&create_messages_proposal(management_type))
.build()
.expect("Unable to build AdminServiceEvent")
}
fn create_proposal_vote_messages_event(management_type: &str) -> messages::AdminServiceEvent {
let requester =
&parse_hex("0283a14e0a17cb7f665311e9b5560f4cde2b502f17e2d03223e15d90d9318d7482")
.unwrap();
messages::AdminServiceEvent::ProposalVote((
messages::CircuitProposal::from(create_messages_proposal(management_type)),
requester.to_vec(),
))
}
}