1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use crate::RequestContext;
use dbui_core::{Project, RequestMessage, ResponseMessage, Result, ResultExt};

#[derive(Debug)]
pub struct MessageHandler {
  project: Project,
  request: RequestContext,
  log: slog::Logger
}

impl MessageHandler {
  pub fn new(project: Project, request: RequestContext) -> MessageHandler {
    let log = request.log().new(slog::o!("service" => "message_handler"));
    MessageHandler { project, request, log }
  }

  pub fn on_open(&self) -> Vec<ResponseMessage> {
    vec![ResponseMessage::Connected {
      p: Box::new(self.project.to_description()),
      u: Box::new((*self.request.user()).clone())
    }]
  }

  pub fn on_closed(&self) -> Vec<ResponseMessage> {
    Vec::new()
  }

  pub fn on_message(&self, msg: RequestMessage) -> Result<Vec<ResponseMessage>> {
    let mut ret = Vec::new();
    match msg {
      RequestMessage::Ping { v } => ret.push(ResponseMessage::Pong { v }),
      RequestMessage::RunQuery { id, sql } => ret.push(self.run_query(id, &sql)?),
      RequestMessage::RefreshSchema { full } => ret.push(self.refresh_schema(full)?),
      msg => slog::warn!(self.log, "Unhandled RequestMessage [{:?}]", msg)
    }
    Ok(ret)
  }

  pub fn on_error(&self) {}

  pub fn log(&self) -> &slog::Logger {
    &self.log
  }

  fn run_sql(&self, id: uuid::Uuid, sql: &str) -> Result<Vec<postgres::row::Row>> {
    let mut conn = self.request.app().open_db(self.project.key())?;
    let stmt = conn.prepare(sql).chain_err(|| format!("Cannot parse query [{}]", id))?;
    conn.query(&stmt, &[]).chain_err(|| format!("Cannot run query [{}]", id))
  }

  fn run_query(&self, id: uuid::Uuid, sql: &str) -> Result<ResponseMessage> {
    slog::info!(self.log(), "Running SQL as [{}]: {}", id, sql);
    let start_ms = dbui_core::util::current_time_ms();
    let rows = self.run_sql(id, sql)?;
    let duration_ms = (dbui_core::util::current_time_ms() - start_ms) as i32;
    let result = dbui_database::from_rows(id, sql, rows, duration_ms)?;
    Ok(ResponseMessage::SqlResponse { result })
  }

  fn refresh_schema(&self, full: bool) -> Result<ResponseMessage> {
    let conn = self.request.app().open_db(self.project.key())?;
    dbui_database::schema::refresh::refresh_schema(&self.project, conn, self.log(), full)
  }
}