use crate::db_connectors::dynamodb::{Bot, Conversation, Memory, Message};
use crate::{
data::{DynamoBot, DynamoBotBincode, DynamoDbClient},
encrypt::decrypt_data,
Client, EngineError,
};
use rusoto_core::RusotoError;
use rusoto_dynamodb::{
BatchGetItemError, BatchGetItemInput, BatchWriteItemError, BatchWriteItemInput, DynamoDb,
GetItemError, GetItemInput,
};
use std::{thread, time};
use rand::Rng;
const RETRY_BASE: u64 = 500;
const MAX_INTERVAL_LIMIT: u64 = 60_000;
const MAX_ELAPSED_TIME_MILLIS: u64 = 600_000;
pub fn get_date_time() -> String {
return chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S.%3fZ")
.to_string();
}
pub fn get_table_name() -> Result<String, EngineError> {
match std::env::var("AWS_DYNAMODB_TABLE") {
Ok(val) => return Ok(val),
_ => {
return Err(EngineError::Manager(
"Missing AWS_DYNAMODB_TABLE env var".to_owned(),
))
}
}
}
pub fn make_hash(client: &Client) -> String {
format!(
"bot_id:{}#channel_id:{}#user_id:{}",
client.bot_id, client.channel_id, client.user_id
)
}
pub fn make_range(args: &[&str]) -> String {
let mut res = "".to_owned();
for arg in args.iter() {
if res.len() > 0 {
res = res + "#";
}
res = res + arg.to_owned();
}
res.to_owned()
}
pub fn execute_batch_write_query(
db: &mut DynamoDbClient,
input: BatchWriteItemInput,
) -> Result<(), RusotoError<BatchWriteItemError>> {
let mut retry_times = 1;
let mut rng = rand::thread_rng();
let now = time::Instant::now();
loop {
match db
.runtime
.block_on(db.client.batch_write_item(input.clone()))
{
Ok(_) => return Ok(()),
Err(RusotoError::Service(BatchWriteItemError::ProvisionedThroughputExceeded(err))) => {
let interval = std::cmp::min(MAX_INTERVAL_LIMIT, RETRY_BASE * 2 * retry_times);
let interval_jitter = rng.gen_range(0..interval);
let duration = time::Duration::from_millis(interval_jitter);
thread::sleep(duration);
if now.elapsed() >= time::Duration::from_millis(MAX_ELAPSED_TIME_MILLIS) {
return Err(RusotoError::Service(
BatchWriteItemError::ProvisionedThroughputExceeded(err),
));
}
}
Err(err) => return Err(err),
}
retry_times += 1;
}
}
pub fn execute_bot_version_batch_get_query(
db: &mut DynamoDbClient,
input: BatchGetItemInput,
) -> Result<Vec<serde_json::Value>, EngineError> {
let mut retry_times = 1;
let mut rng = rand::thread_rng();
let now = time::Instant::now();
loop {
match db.runtime.block_on(db.client.batch_get_item(input.clone())) {
Ok(output) => {
let items = match output.responses {
None => return Ok(vec![]),
Some(items) if items.len() == 0 => return Ok(vec![]),
Some(items) => items.clone(),
};
let mut bots = vec![];
for (_, item) in items {
for item in item {
let data: Bot = serde_dynamodb::from_hashmap(item.to_owned())?;
let csml_bot: DynamoBot = match base64::decode(&data.bot) {
Ok(base64decoded) => {
match bincode::deserialize::<DynamoBotBincode>(&base64decoded[..]) {
Ok(bot) => bot.to_bot(),
Err(_) => serde_json::from_str(&data.bot).unwrap(),
}
}
Err(_) => serde_json::from_str(&data.bot).unwrap(),
};
let mut json = serde_json::json!({
"version_id": data.version_id,
"id": data.id,
"name": csml_bot.name,
"default_flow": csml_bot.default_flow,
"engine_version": data.engine_version,
"created_at": data.created_at
});
if let Some(custom_components) = csml_bot.custom_components {
json["custom_components"] = serde_json::json!(custom_components);
}
bots.push(json);
}
}
return Ok(bots);
}
Err(RusotoError::Service(BatchGetItemError::ProvisionedThroughputExceeded(err))) => {
let interval = std::cmp::min(MAX_INTERVAL_LIMIT, RETRY_BASE * 2 * retry_times);
let interval_jitter = rng.gen_range(0..interval);
let duration = time::Duration::from_millis(interval_jitter);
thread::sleep(duration);
if now.elapsed() >= time::Duration::from_millis(MAX_ELAPSED_TIME_MILLIS) {
return Err(RusotoError::Service(
BatchGetItemError::ProvisionedThroughputExceeded(err),
)
.into());
}
}
Err(err) => return Err(err.into()),
}
retry_times += 1;
}
}
pub fn execute_messages_batch_get_query(
db: &mut DynamoDbClient,
input: BatchGetItemInput,
) -> Result<Vec<serde_json::Value>, EngineError> {
let mut retry_times = 1;
let mut rng = rand::thread_rng();
let now = time::Instant::now();
loop {
match db.runtime.block_on(db.client.batch_get_item(input.clone())) {
Ok(output) => {
let items = match output.responses {
None => return Ok(vec![]),
Some(items) if items.len() == 0 => return Ok(vec![]),
Some(items) => items.clone(),
};
let mut messages = vec![];
for (_, item) in items {
for item in item {
let message: Message = serde_dynamodb::from_hashmap(item)?;
let json = serde_json::json!({
"client": message.client,
"conversation_id": message.conversation_id,
"flow_id": message.flow_id,
"step_id": message.step_id,
"message_order": message.message_order,
"interaction_order": message.interaction_order,
"direction": message.direction,
"payload": decrypt_data(message.payload)?,
"created_at": message.created_at
});
messages.push(json)
}
}
return Ok(messages);
}
Err(RusotoError::Service(BatchGetItemError::ProvisionedThroughputExceeded(err))) => {
let interval = std::cmp::min(MAX_INTERVAL_LIMIT, RETRY_BASE * 2 * retry_times);
let interval_jitter = rng.gen_range(0..interval);
let duration = time::Duration::from_millis(interval_jitter);
thread::sleep(duration);
if now.elapsed() >= time::Duration::from_millis(MAX_ELAPSED_TIME_MILLIS) {
return Err(RusotoError::Service(
BatchGetItemError::ProvisionedThroughputExceeded(err),
)
.into());
}
}
Err(err) => return Err(err.into()),
}
retry_times += 1;
}
}
pub fn execute_memory_batch_get_query(
db: &mut DynamoDbClient,
input: BatchGetItemInput,
) -> Result<Vec<serde_json::Value>, EngineError> {
let mut retry_times = 1;
let mut rng = rand::thread_rng();
let now = time::Instant::now();
loop {
match db.runtime.block_on(db.client.batch_get_item(input.clone())) {
Ok(output) => {
let items = match output.responses {
None => return Ok(vec![]),
Some(items) if items.len() == 0 => return Ok(vec![]),
Some(items) => items.clone(),
};
let mut memories = vec![];
for (_, item) in items {
for item in item {
let memory: Memory = serde_dynamodb::from_hashmap(item)?;
let json = serde_json::json!({
"key": memory.key,
"value": decrypt_data(memory.value.unwrap())?,
"created_at": memory.created_at,
});
memories.push(json)
}
}
return Ok(memories);
}
Err(RusotoError::Service(BatchGetItemError::ProvisionedThroughputExceeded(err))) => {
let interval = std::cmp::min(MAX_INTERVAL_LIMIT, RETRY_BASE * 2 * retry_times);
let interval_jitter = rng.gen_range(0..interval);
let duration = time::Duration::from_millis(interval_jitter);
thread::sleep(duration);
if now.elapsed() >= time::Duration::from_millis(MAX_ELAPSED_TIME_MILLIS) {
return Err(RusotoError::Service(
BatchGetItemError::ProvisionedThroughputExceeded(err),
)
.into());
}
}
Err(err) => return Err(err.into()),
}
retry_times += 1;
}
}
pub fn execute_conversations_batch_get_query(
db: &mut DynamoDbClient,
input: BatchGetItemInput,
) -> Result<Vec<Conversation>, EngineError> {
let mut retry_times = 1;
let mut rng = rand::thread_rng();
let now = time::Instant::now();
loop {
match db.runtime.block_on(db.client.batch_get_item(input.clone())) {
Ok(output) => {
let items = match output.responses {
None => return Ok(vec![]),
Some(items) if items.len() == 0 => return Ok(vec![]),
Some(items) => items.clone(),
};
let mut conversations = vec![];
for (_, item) in items {
for item in item {
let conversation: Conversation = serde_dynamodb::from_hashmap(item)?;
conversations.push(conversation)
}
}
return Ok(conversations);
}
Err(RusotoError::Service(BatchGetItemError::ProvisionedThroughputExceeded(err))) => {
let interval = std::cmp::min(MAX_INTERVAL_LIMIT, RETRY_BASE * 2 * retry_times);
let interval_jitter = rng.gen_range(0..interval);
let duration = time::Duration::from_millis(interval_jitter);
thread::sleep(duration);
if now.elapsed() >= time::Duration::from_millis(MAX_ELAPSED_TIME_MILLIS) {
return Err(RusotoError::Service(
BatchGetItemError::ProvisionedThroughputExceeded(err),
)
.into());
}
}
Err(err) => return Err(err.into()),
}
retry_times += 1;
}
}
pub fn execute_conversation_get_query(
db: &mut DynamoDbClient,
input: GetItemInput,
) -> Result<Conversation, EngineError> {
let mut retry_times = 1;
let mut rng = rand::thread_rng();
let now = time::Instant::now();
loop {
match db.runtime.block_on(db.client.get_item(input.clone())) {
Ok(item) => {
let conversation: Conversation = serde_dynamodb::from_hashmap(item.item.unwrap())?;
return Ok(conversation);
}
Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(err))) => {
let interval = std::cmp::min(MAX_INTERVAL_LIMIT, RETRY_BASE * 2 * retry_times);
let interval_jitter = rng.gen_range(0..interval);
let duration = time::Duration::from_millis(interval_jitter);
thread::sleep(duration);
if now.elapsed() >= time::Duration::from_millis(MAX_ELAPSED_TIME_MILLIS) {
return Err(RusotoError::Service(
BatchGetItemError::ProvisionedThroughputExceeded(err),
)
.into());
}
}
Err(err) => return Err(err.into()),
}
retry_times += 1;
}
}