use super::PikeStoreOperations;
use crate::paging::Paging;
use crate::pike::store::diesel::{
schema::{pike_agent, pike_agent_role_assoc},
Agent, AgentList, PikeStoreError,
};
use crate::commits::MAX_COMMIT_NUM;
use crate::error::InternalError;
use crate::pike::store::diesel::models::{AgentModel, RoleAssociationModel};
use diesel::prelude::*;
use std::convert::TryInto;
pub(in crate::pike::store::diesel) trait PikeStoreListAgentsOperation {
fn list_agents(
&self,
service_id: Option<&str>,
offset: i64,
limit: i64,
) -> Result<AgentList, PikeStoreError>;
}
#[cfg(feature = "postgres")]
impl<'a> PikeStoreListAgentsOperation for PikeStoreOperations<'a, diesel::pg::PgConnection> {
fn list_agents(
&self,
service_id: Option<&str>,
offset: i64,
limit: i64,
) -> Result<AgentList, PikeStoreError> {
self.conn.transaction::<_, PikeStoreError, _>(|| {
let mut query = pike_agent::table
.into_boxed()
.select(pike_agent::all_columns)
.offset(offset)
.limit(limit)
.filter(pike_agent::end_commit_num.eq(MAX_COMMIT_NUM));
if let Some(service_id) = service_id {
query = query.filter(pike_agent::service_id.eq(service_id));
} else {
query = query.filter(pike_agent::service_id.is_null());
}
let agent_models = query.load::<AgentModel>(self.conn).map_err(|err| {
PikeStoreError::InternalError(InternalError::from_source(Box::new(err)))
})?;
let total = agent_models.len().try_into().map_err(|err| {
PikeStoreError::InternalError(InternalError::from_source(Box::new(err)))
})?;
let mut agents = Vec::new();
for a in agent_models {
let mut query = pike_agent_role_assoc::table
.into_boxed()
.select(pike_agent_role_assoc::all_columns)
.filter(
pike_agent_role_assoc::agent_public_key
.eq(&a.public_key)
.and(pike_agent_role_assoc::org_id.eq(&a.org_id))
.and(pike_agent_role_assoc::end_commit_num.eq(MAX_COMMIT_NUM)),
);
if let Some(service_id) = service_id {
query = query.filter(pike_agent_role_assoc::service_id.eq(service_id));
} else {
query = query.filter(pike_agent_role_assoc::service_id.is_null());
}
let roles = query
.load::<RoleAssociationModel>(self.conn)
.map_err(|err| {
PikeStoreError::InternalError(InternalError::from_source(Box::new(err)))
})?;
agents.push(Agent::from((a, roles)));
}
Ok(AgentList::new(agents, Paging::new(offset, limit, total)))
})
}
}
#[cfg(feature = "sqlite")]
impl<'a> PikeStoreListAgentsOperation
for PikeStoreOperations<'a, diesel::sqlite::SqliteConnection>
{
fn list_agents(
&self,
service_id: Option<&str>,
offset: i64,
limit: i64,
) -> Result<AgentList, PikeStoreError> {
self.conn.transaction::<_, PikeStoreError, _>(|| {
let mut query = pike_agent::table
.into_boxed()
.select(pike_agent::all_columns)
.offset(offset)
.limit(limit)
.filter(pike_agent::end_commit_num.eq(MAX_COMMIT_NUM));
if let Some(service_id) = service_id {
query = query.filter(pike_agent::service_id.eq(service_id));
} else {
query = query.filter(pike_agent::service_id.is_null());
}
let agent_models = query.load::<AgentModel>(self.conn).map_err(|err| {
PikeStoreError::InternalError(InternalError::from_source(Box::new(err)))
})?;
let total = agent_models.len().try_into().map_err(|err| {
PikeStoreError::InternalError(InternalError::from_source(Box::new(err)))
})?;
let mut agents = Vec::new();
for a in agent_models {
let mut query = pike_agent_role_assoc::table
.into_boxed()
.select(pike_agent_role_assoc::all_columns)
.filter(
pike_agent_role_assoc::agent_public_key
.eq(&a.public_key)
.and(pike_agent_role_assoc::org_id.eq(&a.org_id))
.and(pike_agent_role_assoc::end_commit_num.eq(MAX_COMMIT_NUM)),
);
if let Some(service_id) = service_id {
query = query.filter(pike_agent_role_assoc::service_id.eq(service_id));
} else {
query = query.filter(pike_agent_role_assoc::service_id.is_null());
}
let roles = query
.load::<RoleAssociationModel>(self.conn)
.map_err(|err| {
PikeStoreError::InternalError(InternalError::from_source(Box::new(err)))
})?;
agents.push(Agent::from((a, roles)));
}
Ok(AgentList::new(agents, Paging::new(offset, limit, total)))
})
}
}