use crate::cache::get_hash;
use crate::common::{
deserialize_from_json, get_hash_item, get_items, insert_items, serialize_to_json,
};
use crate::constants::{
AccountKeys, ACCOUNTS_KEY, ACCOUNT_IDENTS_KEY, build_trunk_key
};
use redis::pipe;
use cal_core::accounting::Address;
use cal_core::device::device::DeviceStruct;
use cal_core::{Account, AccountLite, AccountUpdate, Asset, Hook, RedisEvent, Trunk, DDI};
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, RedisError, RedisResult, Value};
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Display;
use crate::publish_event;
pub async fn get_accounts(con: MultiplexedConnection) -> Result<Vec<AccountLite>, RedisError> {
get_items(con, ACCOUNTS_KEY).await
}
pub async fn get_account_by_ident(
con: MultiplexedConnection,
key: &str,
) -> Result<Option<AccountLite>, RedisError> {
match get_hash(con.clone(), ACCOUNT_IDENTS_KEY, key).await? {
Some(account_id) => {
println!("get_account_by_ident: Found account ident {:?}", account_id);
get_account_by_id(con, &account_id).await
}
None => {
println!("get_account_by_ident: No account ident for {:?}", key);
Ok(None)
}
}
}
pub async fn get_account_by_id(
con: MultiplexedConnection,
key: &str,
) -> Result<Option<AccountLite>, RedisError> {
get_hash_item(con, ACCOUNTS_KEY, key).await
}
pub async fn insert_account(
mut con: MultiplexedConnection,
account: Account,
) -> Result<(), RedisError> {
let account_lite: AccountLite = account.clone().into();
let value = serialize_to_json(&account_lite)?;
let _: Value = con.hset(ACCOUNTS_KEY, &account.id, value).await?;
build_account_idents(&mut con, &account).await?;
let mut pipe = pipe();
if !account.ddis.is_empty() {
let key = AccountKeys::ddis(&account.id);
let entries: Vec<(String, String)> = account.ddis.iter()
.map(|ddi| (ddi.id.clone(), serialize_to_json(ddi).unwrap_or_default()))
.collect();
pipe.hset_multiple(&key, &entries);
}
if !account.devices.is_empty() {
let key = AccountKeys::devices(&account.id);
let entries: Vec<(String, String)> = account.devices.iter()
.map(|device| (device.id.clone(), serialize_to_json(device).unwrap_or_default()))
.collect();
pipe.hset_multiple(&key, &entries);
}
if !account.trunks.is_empty() {
let key = AccountKeys::trunks(&account.id);
let entries: Vec<(String, String)> = account.trunks.iter()
.map(|trunk| (trunk.id.clone(), serialize_to_json(trunk).unwrap_or_default()))
.collect();
pipe.hset_multiple(&key, &entries);
}
if !account.hooks.is_empty() {
let key = AccountKeys::hooks(&account.id);
let entries: Vec<(String, String)> = account.hooks.iter()
.map(|hook| (hook.id.clone(), serialize_to_json(hook).unwrap_or_default()))
.collect();
pipe.hset_multiple(&key, &entries);
}
if !account.assets.is_empty() {
let key = AccountKeys::assets(&account.id);
let entries: Vec<(String, String)> = account.assets.iter()
.map(|asset| (asset.id.clone(), serialize_to_json(asset).unwrap_or_default()))
.collect();
pipe.hset_multiple(&key, &entries);
}
let addresses_key = AccountKeys::addresses(&account.id);
let mut address_entries = vec![("default".to_string(), serialize_to_json(&account.address).unwrap_or_default())];
for address in &account.addresses {
address_entries.push((address.id.clone(), serialize_to_json(address).unwrap_or_default()));
}
pipe.hset_multiple(&addresses_key, &address_entries);
let _: () = pipe.query_async(&mut con).await?;
insert_device_idents(con.clone(), &account.id, &account.devices, &account.ddis).await?;
let event = RedisEvent::AccountCreate(AccountUpdate {
payload: account.clone(),
});
publish_event(&mut con, event).await?;
Ok(())
}
async fn get_collection<T: DeserializeOwned>(
con: MultiplexedConnection,
key: &str,
) -> Result<Vec<T>, RedisError> {
get_items(con, key).await
}
async fn get_collection_item<T: DeserializeOwned, I: Display>(
con: MultiplexedConnection,
key: &str,
item_id: I,
) -> Result<Option<T>, RedisError> {
get_hash_item(con, key, &item_id.to_string()).await
}
async fn build_account_idents(
con: &mut MultiplexedConnection,
account: &Account,
) -> Result<(), RedisError> {
let mut ident_entries = vec![
(account.id.clone(), account.id.clone()),
(account.domain.clone(), account.id.clone()),
];
let mut trunk_pipe = pipe();
for ddi in &account.ddis {
ident_entries.push((ddi.name.clone(), account.id.clone()));
for trunk in &account.trunks {
let key = build_trunk_key(&trunk.ip);
trunk_pipe.hset(&key, &ddi.name, &account.id);
}
}
for device in &account.devices {
if let Some(client) = &device.client {
let user_domain = format!("{}@connect.callable.io", client.username);
ident_entries.push((user_domain, account.id.clone()));
}
}
let _: Value = con.hset_multiple(ACCOUNT_IDENTS_KEY, &ident_entries).await?;
if !account.ddis.is_empty() && !account.trunks.is_empty() {
let _: () = trunk_pipe.query_async(con).await?;
}
Ok(())
}
pub async fn get_devices(
con: MultiplexedConnection,
account_id: &str,
) -> Result<Vec<DeviceStruct>, RedisError> {
get_collection(con, &AccountKeys::devices(account_id)).await
}
pub async fn get_device(
con: MultiplexedConnection,
account_id: &str,
device_id: &str,
) -> Result<Option<DeviceStruct>, RedisError> {
get_collection_item(con, &AccountKeys::devices(account_id), device_id).await
}
pub async fn get_device_by_ident(
con: MultiplexedConnection,
account_id: &str,
id: &str,
) -> Result<Option<DeviceStruct>, RedisError> {
println!("Getting device by ident {}", id);
let key = AccountKeys::device_idents(account_id);
let res: Option<String> = con.clone().hget(&key, id).await?;
match res {
Some(ident) => {
println!("Found device ident: {}, getting device by id: {}", id, ident);
get_device(con.clone(), account_id, ident.as_str()).await
},
None => Ok(None),
}
}
pub async fn get_outbound_device(
con: MultiplexedConnection,
account_id: &str,
) -> Result<Option<DeviceStruct>, RedisError> {
get_device_by_ident(con, account_id, "OUTBOUND_ROUTE").await
}
pub async fn insert_devices(
con: MultiplexedConnection,
account_id: &str,
devices: &[DeviceStruct],
) -> RedisResult<Value> {
insert_items(con, &AccountKeys::devices(account_id), devices, |device| {
device.id.clone()
})
.await
}
pub async fn get_ddis(
con: MultiplexedConnection,
account_id: &str,
) -> Result<Vec<DDI>, RedisError> {
get_collection(con, &AccountKeys::ddis(account_id)).await
}
pub async fn get_ddi(
con: MultiplexedConnection,
account_id: &str,
ddi_id: &str,
) -> Result<Option<DDI>, RedisError> {
get_collection_item(con, &AccountKeys::ddis(account_id), ddi_id).await
}
pub async fn insert_ddis(
con: MultiplexedConnection,
account_id: &str,
ddis: &[DDI],
) -> RedisResult<Value> {
insert_items(con, &AccountKeys::ddis(account_id), ddis, |ddi| ddi.id.clone()).await
}
pub async fn get_hooks(
con: MultiplexedConnection,
account_id: &str,
) -> Result<Vec<Hook>, RedisError> {
get_collection(con, &AccountKeys::hooks(account_id)).await
}
pub async fn get_hook(
con: MultiplexedConnection,
account_id: &str,
hook_id: &str,
) -> Result<Option<Hook>, RedisError> {
get_collection_item(con, &AccountKeys::hooks(account_id), hook_id).await
}
pub async fn insert_hooks(
con: MultiplexedConnection,
account_id: &str,
hooks: &[Hook],
) -> RedisResult<Value> {
insert_items(con, &AccountKeys::hooks(account_id), hooks, |hook| hook.id.clone()).await
}
pub async fn get_trunks(
con: MultiplexedConnection,
account_id: &str,
) -> Result<Vec<Trunk>, RedisError> {
get_collection(con, &AccountKeys::trunks(account_id)).await
}
pub async fn get_trunk(
con: MultiplexedConnection,
account_id: &str,
trunk_id: &str,
) -> Result<Option<Trunk>, RedisError> {
get_collection_item(con, &AccountKeys::trunks(account_id), trunk_id).await
}
pub async fn get_trunk_and_ddi(
con: MultiplexedConnection,
ddi_id: &str,
trunk_ip: &str,
) -> Result<Option<AccountLite>, RedisError> {
let key = build_trunk_key(trunk_ip);
let id: Option<String> = con.clone().hget(&key, ddi_id).await?;
match id {
Some(account_id) => get_account_by_ident(con, &account_id).await,
None => Ok(None),
}
}
pub async fn insert_trunks(
con: MultiplexedConnection,
account_id: &str,
trunks: &[Trunk],
) -> RedisResult<Value> {
insert_items(con, &AccountKeys::trunks(account_id), trunks, |trunk| trunk.id.clone()).await
}
pub async fn get_assets(
con: MultiplexedConnection,
account_id: &str,
) -> Result<Vec<Asset>, RedisError> {
get_collection(con, &AccountKeys::assets(account_id)).await
}
pub async fn get_asset(
con: MultiplexedConnection,
account_id: &str,
asset_id: &str,
) -> Result<Option<Asset>, RedisError> {
get_collection_item(con, &AccountKeys::assets(account_id), asset_id).await
}
pub async fn insert_assets(
con: MultiplexedConnection,
account_id: &str,
assets: &[Asset],
) -> RedisResult<Value> {
insert_items(con, &AccountKeys::assets(account_id), assets, |asset| asset.id.clone()).await
}
pub async fn get_addresses(
con: MultiplexedConnection,
account_id: &str,
) -> Result<Vec<Address>, RedisError> {
get_collection(con, &AccountKeys::addresses(account_id)).await
}
pub async fn get_address(
con: MultiplexedConnection,
account_id: &str,
address_id: &str,
) -> Result<Option<Address>, RedisError> {
get_collection_item(con, &AccountKeys::addresses(account_id), address_id).await
}
async fn insert_addresses(
mut con: MultiplexedConnection,
account_id: &str,
default_address: Address,
additional_addresses: &[Address],
) -> RedisResult<Value> {
let key = AccountKeys::addresses(account_id);
let mut entries = Vec::with_capacity(additional_addresses.len() + 1);
entries.push(("default".to_string(), serialize_to_json(&default_address)?));
for address in additional_addresses {
entries.push((address.id.clone(), serialize_to_json(address)?));
}
con.hset_multiple(key, &entries).await
}
async fn insert_device_idents(
mut con: MultiplexedConnection,
account_id: &str,
devices: &[DeviceStruct],
ddis: &[DDI],
) -> RedisResult<Value> {
let key = AccountKeys::device_idents(account_id);
let _: Value = con.del(&key).await?;
let mut entries: Vec<(String, String)> = Vec::new();
for device in devices {
if device.clone().start_route.is_some() {
for ddi in ddis {
if device.clone().start_route.unwrap().ddis.contains(&ddi.id) {
entries.push((ddi.name.clone(), device.id.clone()));
}
}
entries.push((device.id.clone(), device.id.clone()));
entries.push((device.extension.to_string(), device.id.clone()));
}
}
for device in devices {
if device.clone().regex_route.is_some() {
entries.push(("OUTBOUND_ROUTE".to_string(), device.id.clone()));
}
}
con.hset_multiple(&key, &entries).await
}
pub async fn insert_account_idents(
mut con: MultiplexedConnection,
items: &[(String, String)],
) -> RedisResult<Value> {
con.hset_multiple(ACCOUNT_IDENTS_KEY, items).await
}