use crate::RequestContext;
use anyhow::{Context, Result};
use dbui_core::{Project, RequestMessage, ResponseMessage};
use uuid::Uuid;
#[derive(Debug)]
pub struct MessageHandler {
connection_id: Uuid,
project: Project,
ctx: RequestContext,
log: slog::Logger
}
impl MessageHandler {
pub fn new(connection_id: Uuid, project: Project, ctx: RequestContext) -> MessageHandler {
let log = ctx
.log()
.new(slog::o!("connection" => format!("{}", connection_id), "project" => project.key().clone(), "service" => "message_handler"));
MessageHandler {
connection_id,
project,
ctx,
log
}
}
pub fn connection_id(&self) -> &Uuid {
&self.connection_id
}
pub fn project(&self) -> &Project {
&self.project
}
pub fn ctx(&self) -> &RequestContext {
&self.ctx
}
pub fn on_open(&self) -> Result<Vec<ResponseMessage>> {
Ok(vec![ResponseMessage::Connected {
connection_id: *self.connection_id(),
user_id: *self.ctx().user_id(),
p: Box::new(self.project.to_description()),
u: Box::new((*self.ctx.user_profile()).clone()),
b: !self.ctx.app().verbose()
}])
}
pub fn on_closed(&self) -> Vec<ResponseMessage> {
Vec::new()
}
pub fn on_message(&self, msg: RequestMessage) -> Result<()> {
match msg {
RequestMessage::Ping { v } => self.send_to_self(ResponseMessage::Pong { v }),
RequestMessage::RunQuery { id, ctx, sql } => self.run_query(id, &ctx, &sql),
RequestMessage::RefreshSchema { full } => self.refresh_schema(full),
msg => {
slog::warn!(self.log, "Unhandled RequestMessage [{:?}]", msg);
Ok(())
}
}
}
pub fn on_error(&self) {}
pub fn log(&self) -> &slog::Logger {
&self.log
}
fn run_sql(&self, id: Uuid, sql: &str) -> Result<Vec<postgres::row::Row>> {
let mut conn = self.ctx.app().open_db(self.project.key())?;
let stmt = conn.prepare(sql).with_context(|| format!("Cannot parse query [{}]", id))?;
conn.query(&stmt, &[]).with_context(|| format!("Cannot run query [{}]", id))
}
fn run_query(&self, id: Uuid, ctx: &str, sql: &str) -> Result<()> {
slog::info!(self.log(), "Running SQL as [{}/{}]: {}", ctx, id, sql);
let start_ms = dbui_core::util::current_time_ms();
let rows = self.run_sql(id, sql)?;
let duration_ms = (dbui_core::util::current_time_ms() - start_ms) as i32;
let result = dbui_database::from_rows(id, ctx, sql, rows, duration_ms)?;
self.send_to_self(ResponseMessage::SqlResponse { result })
}
fn refresh_schema(&self, full: bool) -> Result<()> {
let conn = self.ctx.app().open_db(self.project.key())?;
self.send_to_self(dbui_database::schema::refresh::refresh_schema(
&self.project,
conn,
self.log(),
full
)?)
}
fn send_to_self(&self, msg: ResponseMessage) -> Result<()> {
self.ctx().app().send_connection(self.connection_id(), msg);
Ok(())
}
fn _send_to_channel(&self, msg: ResponseMessage) -> Result<()> {
self.ctx().app().send_channel(self.project().key(), msg);
Ok(())
}
fn _send_to_channel_except_self(&self, msg: ResponseMessage) -> Result<()> {
self
.ctx()
.app()
.send_channel_except(self.project().key(), vec![self.connection_id()], msg);
Ok(())
}
}