use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use crate::data::executor::response_codec::decode_payload_to_json;
use crate::types::TraceId;
use super::super::super::types::sqlstate_error;
pub async fn refresh_materialized_view(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 4 {
return Err(sqlstate_error(
"42601",
"syntax: REFRESH MATERIALIZED VIEW <name>",
));
}
let name = parts[3].to_lowercase();
let tenant_id = identity.tenant_id;
let view = if let Some(catalog) = state.credentials.catalog() {
match catalog.get_materialized_view(tenant_id.as_u64(), &name) {
Ok(Some(v)) => v,
Ok(None) => {
return Err(sqlstate_error(
"42P01",
&format!("materialized view '{name}' does not exist"),
));
}
Err(e) => return Err(sqlstate_error("XX000", &e.to_string())),
}
} else {
return Err(sqlstate_error("XX000", "catalog unavailable"));
};
let rows = execute_select(state, tenant_id, &view.query_sql).await?;
dispatch_sql(state, tenant_id, &format!("DELETE FROM {}", view.name)).await?;
for (idx, row) in rows.iter().enumerate() {
let mut row = row.clone();
if !row.contains_key("id") {
row.insert(
"id".to_string(),
serde_json::Value::String(format!("mv_{idx}")),
);
}
let insert_sql = build_insert_sql(&view.name, &row)?;
dispatch_sql(state, tenant_id, &insert_sql).await?;
}
tracing::info!(
view = view.name,
rows = rows.len(),
"materialized view refreshed"
);
Ok(vec![Response::Execution(Tag::new(
"REFRESH MATERIALIZED VIEW",
))])
}
async fn execute_select(
state: &SharedState,
tenant_id: nodedb_types::TenantId,
sql: &str,
) -> PgWireResult<Vec<serde_json::Map<String, serde_json::Value>>> {
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
let tasks = query_ctx
.plan_sql(sql, tenant_id, crate::types::DatabaseId::DEFAULT)
.await
.map_err(|e| sqlstate_error("XX000", &format!("plan '{sql}': {e}")))?;
let mut rows: Vec<serde_json::Map<String, serde_json::Value>> = Vec::new();
for task in tasks {
let response = crate::control::server::dispatch_utils::dispatch_to_data_plane(
state,
tenant_id,
task.vshard_id,
task.plan,
TraceId::ZERO,
)
.await
.map_err(|e| sqlstate_error("XX000", &format!("dispatch: {e}")))?;
let payload = response.payload.as_ref();
if payload.is_empty() {
continue;
}
let json = decode_payload_to_json(payload);
if json.is_empty() {
continue;
}
let parsed: serde_json::Value = sonic_rs::from_str(&json)
.map_err(|e| sqlstate_error("XX000", &format!("decode scan payload: {e}")))?;
collect_rows(parsed, &mut rows);
}
Ok(rows)
}
fn collect_rows(
value: serde_json::Value,
out: &mut Vec<serde_json::Map<String, serde_json::Value>>,
) {
match value {
serde_json::Value::Array(arr) => {
for v in arr {
collect_rows(v, out);
}
}
serde_json::Value::Object(mut obj) => {
if obj.len() == 2
&& obj.contains_key("id")
&& matches!(obj.get("data"), Some(serde_json::Value::Object(_)))
{
if let Some(serde_json::Value::Object(inner)) = obj.remove("data") {
out.push(inner);
}
} else {
out.push(obj);
}
}
_ => {}
}
}
fn build_insert_sql(
target: &str,
row: &serde_json::Map<String, serde_json::Value>,
) -> PgWireResult<String> {
if row.is_empty() {
return Err(sqlstate_error(
"XX000",
"materialized view SELECT produced an empty row (no columns)",
));
}
let mut cols: Vec<String> = Vec::with_capacity(row.len());
let mut vals: Vec<String> = Vec::with_capacity(row.len());
for (k, v) in row {
cols.push(format!("\"{}\"", k.replace('"', "\"\"")));
vals.push(json_value_to_sql_literal(v)?);
}
Ok(format!(
"INSERT INTO {} ({}) VALUES ({})",
target,
cols.join(", "),
vals.join(", ")
))
}
fn json_value_to_sql_literal(v: &serde_json::Value) -> PgWireResult<String> {
Ok(match v {
serde_json::Value::Null => "NULL".into(),
serde_json::Value::Bool(b) => if *b { "TRUE" } else { "FALSE" }.into(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::String(s) => format!("'{}'", s.replace('\'', "''")),
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
let s = sonic_rs::to_string(v)
.map_err(|e| sqlstate_error("XX000", &format!("encode nested value: {e}")))?;
format!("'{}'", s.replace('\'', "''"))
}
})
}
async fn dispatch_sql(
state: &SharedState,
tenant_id: nodedb_types::TenantId,
sql: &str,
) -> PgWireResult<()> {
let query_ctx = crate::control::planner::context::QueryContext::for_state(state);
let tasks = query_ctx
.plan_sql(sql, tenant_id, crate::types::DatabaseId::DEFAULT)
.await
.map_err(|e| sqlstate_error("42P20", &format!("plan '{sql}': {e}")))?;
for task in tasks {
crate::control::server::wal_dispatch::wal_append_if_write(
&state.wal,
tenant_id,
task.vshard_id,
task.database_id,
&task.plan,
)
.map_err(|e| sqlstate_error("58030", &format!("wal append: {e}")))?;
crate::control::server::dispatch_utils::dispatch_to_data_plane(
state,
tenant_id,
task.vshard_id,
task.plan,
TraceId::ZERO,
)
.await
.map_err(|e| sqlstate_error("08006", &format!("dispatch: {e}")))?;
}
Ok(())
}