use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use crate::event::cdc::stream_def::{
ChangeStreamDef, CompactionConfig, LateDataPolicy, OpFilter, RetentionConfig, StreamFormat,
};
use crate::event::webhook::WebhookConfig;
use super::super::super::types::{require_tenant_admin, sqlstate_error};
pub fn create_change_stream(
state: &SharedState,
identity: &AuthenticatedIdentity,
name: &str,
collection: &str,
with_clause_raw: &str,
) -> PgWireResult<Vec<Response>> {
require_tenant_admin(identity, "create change streams")?;
if state.event_plane_budget.should_reject_new_streams() {
return Err(sqlstate_error(
"53000",
"Event Plane memory budget exceeded — cannot create new change streams. \
Existing streams continue with reduced retention.",
));
}
let tenant_id = identity.tenant_id.as_u64();
let catalog = state
.credentials
.catalog()
.as_ref()
.ok_or_else(|| sqlstate_error("XX000", "system catalog not available"))?;
if let Ok(Some(_)) = catalog.get_change_stream(tenant_id, name) {
return Err(sqlstate_error(
"42710",
&format!("change stream '{name}' already exists"),
));
}
let kv_pairs: Vec<(String, String)> = if with_clause_raw.is_empty() {
Vec::new()
} else {
extract_key_value_pairs(with_clause_raw)
};
let mut op_filter = OpFilter::all();
let mut format = StreamFormat::Json;
let mut compaction = CompactionConfig::default();
let mut webhook = WebhookConfig::default();
let mut late_data = LateDataPolicy::default();
for (key, val) in &kv_pairs {
match key.as_str() {
"FORMAT" => {
if let Some(f) = StreamFormat::from_str_opt(val) {
format = f;
}
}
"INCLUDE" => {
op_filter = OpFilter {
insert: false,
update: false,
delete: false,
};
for op in val.split(',') {
match op.trim().to_uppercase().as_str() {
"INSERT" => op_filter.insert = true,
"UPDATE" => op_filter.update = true,
"DELETE" => op_filter.delete = true,
_ => {}
}
}
}
"COMPACTION" if val.eq_ignore_ascii_case("key") => {
compaction.enabled = true;
}
"KEY" if !val.is_empty() => {
compaction.key_field = val.clone();
compaction.enabled = true;
}
"URL" if !val.is_empty() => {
webhook.url = val.clone();
}
"RETRY" => {
webhook.max_retries = val.parse().unwrap_or(3);
}
"TIMEOUT" => {
let secs = val
.strip_suffix('s')
.or(Some(val))
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(5);
webhook.timeout_secs = secs;
}
"LATE_DATA" => {
if let Some(policy) = LateDataPolicy::from_str_opt(val) {
late_data = policy;
}
}
_ => {}
}
}
let kafka =
crate::event::kafka::KafkaDeliveryConfig::from_with_params(&kv_pairs).unwrap_or_default();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|_| sqlstate_error("XX000", "system clock before UNIX epoch"))?
.as_secs();
let def = ChangeStreamDef {
tenant_id,
name: name.to_string(),
collection: collection.to_string(),
op_filter,
format,
retention: RetentionConfig::default(),
compaction,
webhook,
late_data,
kafka,
owner: identity.username.clone(),
created_at: now,
};
let has_webhook = def.webhook.is_configured();
let webhook_config = def.webhook.clone();
let kafka_config = def.kafka.clone();
let entry = crate::control::catalog_entry::CatalogEntry::PutChangeStream(Box::new(def.clone()));
let log_index = super::super::catalog_propose::propose_and_apply(state, &entry)?;
if log_index == 0 {
state.stream_registry.register(def.clone());
}
if has_webhook {
state
.webhook_manager
.start_task(tenant_id, name, webhook_config);
}
if kafka_config.enabled {
state.kafka_manager.start(tenant_id, name, kafka_config);
}
state.audit_record(
crate::control::security::audit::AuditEvent::AdminAction,
Some(identity.tenant_id),
&identity.username,
&format!("CREATE CHANGE STREAM {name} ON {collection}"),
);
Ok(vec![Response::Execution(Tag::new("CREATE CHANGE STREAM"))])
}
fn extract_key_value_pairs(inner: &str) -> Vec<(String, String)> {
let mut result = Vec::new();
let mut pairs = Vec::new();
let mut current = String::new();
let mut in_quote = false;
for ch in inner.chars() {
if ch == '\'' && !in_quote {
in_quote = true;
current.push(ch);
} else if ch == '\'' && in_quote {
in_quote = false;
current.push(ch);
} else if ch == ',' && !in_quote {
pairs.push(std::mem::take(&mut current));
} else {
current.push(ch);
}
}
if !current.is_empty() {
pairs.push(current);
}
for pair in pairs {
let pair = pair.trim().to_string();
if let Some((key, value)) = pair.split_once('=') {
let key = key.trim().to_uppercase();
let value = value
.trim()
.trim_matches('\'')
.trim_matches('"')
.to_string();
result.push((key, value));
}
}
result
}