use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use crate::{
db_connectors::postgresql::get_db,
encrypt::{decrypt_data, encrypt_data},
Client, ConversationInfo, EngineError, PostgresqlClient,
};
use super::{
models,
pagination::*,
schema::{csml_conversations, csml_messages},
};
use chrono::NaiveDateTime;
pub fn add_messages_bulk(
data: &ConversationInfo,
msgs: &[serde_json::Value],
interaction_order: i32,
direction: &str,
expires_at: Option<NaiveDateTime>,
) -> Result<(), EngineError> {
if msgs.len() == 0 {
return Ok(());
}
let db = get_db(&data.db)?;
let mut new_messages = vec![];
for (message_order, message) in msgs.iter().enumerate() {
let conversation_id = uuid::Uuid::parse_str(&data.conversation_id).unwrap();
let msg = models::NewMessages {
id: uuid::Uuid::new_v4(),
conversation_id,
flow_id: &data.context.flow,
step_id: data.context.step.get_step_ref(),
direction,
payload: encrypt_data(&message)?,
content_type: &message["content_type"].as_str().unwrap_or("text"),
message_order: message_order as i32,
interaction_order,
expires_at,
};
new_messages.push(msg);
}
diesel::insert_into(csml_messages::table)
.values(&new_messages)
.get_result::<models::Message>(&db.client)?;
Ok(())
}
pub fn delete_user_messages(client: &Client, db: &PostgresqlClient) -> Result<(), EngineError> {
let conversations: Vec<models::Conversation> = csml_conversations::table
.filter(csml_conversations::bot_id.eq(&client.bot_id))
.filter(csml_conversations::channel_id.eq(&client.channel_id))
.filter(csml_conversations::user_id.eq(&client.user_id))
.load(&db.client)?;
for conversation in conversations {
diesel::delete(
csml_messages::table.filter(csml_messages::conversation_id.eq(&conversation.id)),
)
.execute(&db.client)
.ok();
}
Ok(())
}
pub fn get_client_messages(
client: &Client,
db: &PostgresqlClient,
limit: Option<i64>,
pagination_key: Option<String>,
from_date: Option<i64>,
to_date: Option<i64>,
) -> Result<serde_json::Value, EngineError> {
let pagination_key = match pagination_key {
Some(paginate) => paginate.parse::<i64>().unwrap_or(1),
None => 1,
};
let (conversation_with_messages, total_pages) = match from_date {
Some(from_date) => {
let from_date = NaiveDateTime::from_timestamp(from_date, 0);
let to_date = match to_date {
Some(to_date) => NaiveDateTime::from_timestamp(to_date, 0),
None => chrono::Utc::now().naive_utc(),
};
let mut query = csml_conversations::table
.filter(csml_conversations::bot_id.eq(&client.bot_id))
.filter(csml_conversations::channel_id.eq(&client.channel_id))
.filter(csml_conversations::user_id.eq(&client.user_id))
.inner_join(csml_messages::table)
.filter(csml_messages::created_at.ge(from_date))
.filter(csml_messages::created_at.le(to_date))
.select((csml_conversations::all_columns, csml_messages::all_columns))
.order_by(csml_messages::created_at.desc())
.then_order_by(csml_messages::message_order.desc())
.paginate(pagination_key);
let limit_per_page = match limit {
Some(limit) => std::cmp::min(limit, 25),
None => 25,
};
query = query.per_page(limit_per_page);
query.load_and_count_pages::<(models::Conversation, models::Message)>(&db.client)?
}
None => {
let mut query = csml_conversations::table
.filter(csml_conversations::bot_id.eq(&client.bot_id))
.filter(csml_conversations::channel_id.eq(&client.channel_id))
.filter(csml_conversations::user_id.eq(&client.user_id))
.inner_join(csml_messages::table)
.select((csml_conversations::all_columns, csml_messages::all_columns))
.order_by(csml_messages::created_at.desc())
.then_order_by(csml_messages::message_order.desc())
.paginate(pagination_key);
let limit_per_page = match limit {
Some(limit) => std::cmp::min(limit, 25),
None => 25,
};
query = query.per_page(limit_per_page);
query.load_and_count_pages::<(models::Conversation, models::Message)>(&db.client)?
}
};
let (_, messages): (Vec<_>, Vec<_>) = conversation_with_messages.into_iter().unzip();
let mut msgs = vec![];
for message in messages {
let json = serde_json::json!({
"client": {
"bot_id": &client.bot_id,
"channel_id": &client.channel_id,
"user_id": &client.user_id
},
"conversation_id": message.conversation_id,
"flow_id": message.flow_id,
"step_id": message.step_id,
"direction": message.direction,
"payload": decrypt_data(message.payload)?,
"updated_at": message.updated_at.format("%Y-%m-%dT%H:%M:%S%.fZ").to_string(),
"created_at": message.created_at.format("%Y-%m-%dT%H:%M:%S%.fZ").to_string()
});
msgs.push(json);
}
match pagination_key < total_pages {
true => {
let pagination_key = (pagination_key + 1).to_string();
Ok(serde_json::json!({"messages": msgs, "pagination_key": pagination_key}))
}
false => Ok(serde_json::json!({ "messages": msgs })),
}
}