use std::sync::Arc;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use crate::control::security::identity::AuthenticatedIdentity;
use super::super::types::text_field;
use super::core::NodeDbPgHandler;
impl NodeDbPgHandler {
pub(super) fn handle_live_select(
&self,
identity: &AuthenticatedIdentity,
addr: &std::net::SocketAddr,
sql: &str,
) -> PgWireResult<Vec<Response>> {
let coll_name = super::super::ddl::sql_parse::extract_collection_after(sql, " FROM ")
.ok_or_else(|| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"42601".to_owned(),
"syntax: LIVE SELECT [*|fields] FROM <collection> [WHERE ...]".to_owned(),
)))
})?;
let tenant_id = identity.tenant_id;
let sub = self
.state
.change_stream
.subscribe(Some(coll_name.clone()), Some(tenant_id));
let sub_id = sub.id;
let channel = format!("live_{coll_name}");
self.sessions
.add_live_subscription(addr, channel.clone(), sub);
tracing::info!(
sub_id,
collection = coll_name,
channel,
"LIVE SELECT subscription created"
);
use futures::stream;
let schema = Arc::new(vec![
text_field("subscription_id"),
text_field("channel"),
text_field("collection"),
text_field("status"),
]);
let mut encoder = DataRowEncoder::new(schema.clone());
let _ = encoder.encode_field(&sub_id.to_string());
let _ = encoder.encode_field(&channel);
let _ = encoder.encode_field(&coll_name);
let _ = encoder.encode_field(&"active");
let row = encoder.take_row();
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(vec![Ok(row)]),
))])
}
}