1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use crate::RequestContext;

use anyhow::{Context, Result};
use dbui_core::{Project, RequestMessage, ResponseMessage};
use uuid::Uuid;

/// Core application logic, routing [RequestMessage](dbui_core::RequestMessage)s and emitting [ResponseMessage](dbui_core::ResponseMessage)s.
#[derive(Debug)]
pub struct MessageHandler {
  connection_id: Uuid,
  project: Project,
  ctx: RequestContext,
  log: slog::Logger
}

impl MessageHandler {
  pub fn new(connection_id: Uuid, project: Project, ctx: RequestContext) -> MessageHandler {
    let log = ctx
      .log()
      .new(slog::o!("connection" => format!("{}", connection_id), "project" => project.key().clone(), "service" => "message_handler"));
    MessageHandler {
      connection_id,
      project,
      ctx,
      log
    }
  }

  pub fn connection_id(&self) -> &Uuid {
    &self.connection_id
  }

  pub fn project(&self) -> &Project {
    &self.project
  }

  pub fn ctx(&self) -> &RequestContext {
    &self.ctx
  }

  pub fn on_open(&self) -> Result<Vec<ResponseMessage>> {
    Ok(vec![ResponseMessage::Connected {
      connection_id: *self.connection_id(),
      user_id: *self.ctx().user_id(),
      p: Box::new(self.project.to_description()),
      u: Box::new((*self.ctx.user_profile()).clone()),
      b: !self.ctx.app().verbose()
    }])
  }

  pub fn on_closed(&self) -> Vec<ResponseMessage> {
    Vec::new()
  }

  pub fn on_message(&self, msg: RequestMessage) -> Result<()> {
    match msg {
      RequestMessage::Ping { v } => self.send_to_self(ResponseMessage::Pong { v }),
      RequestMessage::RunQuery { id, ctx, sql } => self.run_query(id, &ctx, &sql),
      RequestMessage::RefreshSchema { full } => self.refresh_schema(full),
      msg => {
        slog::warn!(self.log, "Unhandled RequestMessage [{:?}]", msg);
        Ok(())
      }
    }
  }

  pub fn on_error(&self) {}

  pub fn log(&self) -> &slog::Logger {
    &self.log
  }

  fn run_sql(&self, id: Uuid, sql: &str) -> Result<Vec<postgres::row::Row>> {
    let mut conn = self.ctx.app().open_db(self.project.key())?;
    let stmt = conn.prepare(sql).with_context(|| format!("Cannot parse query [{}]", id))?;
    conn.query(&stmt, &[]).with_context(|| format!("Cannot run query [{}]", id))
  }

  fn run_query(&self, id: Uuid, ctx: &str, sql: &str) -> Result<()> {
    slog::info!(self.log(), "Running SQL as [{}/{}]: {}", ctx, id, sql);
    let start_ms = dbui_core::util::current_time_ms();
    let rows = self.run_sql(id, sql)?;
    let duration_ms = (dbui_core::util::current_time_ms() - start_ms) as i32;
    let result = dbui_database::from_rows(id, ctx, sql, rows, duration_ms)?;
    self.send_to_self(ResponseMessage::SqlResponse { result })
  }

  fn refresh_schema(&self, full: bool) -> Result<()> {
    let conn = self.ctx.app().open_db(self.project.key())?;
    self.send_to_self(dbui_database::schema::refresh::refresh_schema(
      &self.project,
      conn,
      self.log(),
      full
    )?)
  }

  fn send_to_self(&self, msg: ResponseMessage) -> Result<()> {
    self.ctx().app().send_connection(self.connection_id(), msg);
    Ok(())
  }

  fn _send_to_channel(&self, msg: ResponseMessage) -> Result<()> {
    self.ctx().app().send_channel(self.project().key(), msg);
    Ok(())
  }

  fn _send_to_channel_except_self(&self, msg: ResponseMessage) -> Result<()> {
    self
      .ctx()
      .app()
      .send_channel_except(self.project().key(), vec![self.connection_id()], msg);
    Ok(())
  }
}