use serde_json::Value;
use crate::error::{Result, SurqlError};
use crate::types::operators::quote_value_public;
use super::builder::{table_part, validate_identifier};
#[cfg(any(feature = "client", feature = "client-rustls"))]
use crate::connection::DatabaseClient;
#[cfg(any(feature = "client", feature = "client-rustls"))]
use crate::query::executor::flatten_rows;
fn format_item_for_surql(item: &Value) -> Result<String> {
let obj = item.as_object().ok_or_else(|| SurqlError::Validation {
reason: "Batch items must be JSON objects".to_string(),
})?;
let mut parts: Vec<String> = Vec::with_capacity(obj.len());
for (key, value) in obj {
validate_identifier(key, "field name")?;
parts.push(format!("{key}: {}", quote_value_public(value)));
}
Ok(format!("{{ {} }}", parts.join(", ")))
}
fn format_items_array(items: &[Value]) -> Result<String> {
let mut lines: Vec<String> = Vec::with_capacity(items.len());
for item in items {
lines.push(format_item_for_surql(item)?);
}
Ok(format!("[\n {}\n]", lines.join(",\n ")))
}
fn render_set_clause(data: &serde_json::Map<String, Value>) -> Result<String> {
let mut parts: Vec<String> = Vec::with_capacity(data.len());
for (key, value) in data {
validate_identifier(key, "field name")?;
parts.push(format!("{key} = {}", quote_value_public(value)));
}
Ok(parts.join(", "))
}
pub fn build_upsert_query(
table: &str,
items: &[Value],
conflict_fields: Option<&[String]>,
) -> Result<String> {
if items.is_empty() {
return Ok(String::new());
}
validate_identifier(table, "table name")?;
if let Some(fields) = conflict_fields {
for f in fields {
validate_identifier(f, "conflict field name")?;
}
}
let items_array = format_items_array(items)?;
if let Some(fields) = conflict_fields {
if !fields.is_empty() {
let conditions = fields
.iter()
.map(|f| format!("{f} = $item.{f}"))
.collect::<Vec<_>>()
.join(" AND ");
return Ok(format!(
"UPSERT INTO {table} {items_array} WHERE {conditions};"
));
}
}
Ok(format!("UPSERT INTO {table} {items_array};"))
}
pub fn build_relate_query(
from_id: &str,
edge: &str,
to_id: &str,
data: Option<&serde_json::Map<String, Value>>,
) -> Result<String> {
validate_identifier(edge, "edge table name")?;
validate_identifier(table_part(from_id), "from record table")?;
validate_identifier(table_part(to_id), "to record table")?;
let mut stmt = format!("RELATE {from_id}->{edge}->{to_id}");
if let Some(data) = data {
if !data.is_empty() {
let set_clause = render_set_clause(data)?;
stmt.push_str(" SET ");
stmt.push_str(&set_clause);
}
}
stmt.push(';');
Ok(stmt)
}
#[cfg(any(feature = "client", feature = "client-rustls"))]
pub async fn upsert_many(
client: &DatabaseClient,
table: &str,
items: Vec<Value>,
conflict_fields: Option<&[String]>,
) -> Result<Vec<Value>> {
if items.is_empty() {
return Ok(Vec::new());
}
validate_identifier(table, "table name")?;
if let Some(fields) = conflict_fields {
for f in fields {
validate_identifier(f, "conflict field name")?;
}
}
let mut rows: Vec<Value> = Vec::with_capacity(items.len());
for mut item in items {
let target = if let Some(obj) = item.as_object_mut() {
match obj
.remove("id")
.and_then(|v| v.as_str().map(ToOwned::to_owned))
{
Some(id) => id,
None => table.to_string(),
}
} else {
table.to_string()
};
validate_identifier(table_part(&target), "record ID table")?;
let mut vars = std::collections::BTreeMap::new();
vars.insert("data".to_owned(), item);
let surql = format!("UPSERT {target} CONTENT $data");
let raw = client.query_with_vars(&surql, vars).await?;
rows.extend(flatten_rows(&raw));
}
Ok(rows)
}
#[cfg(any(feature = "client", feature = "client-rustls"))]
pub async fn insert_many(
client: &DatabaseClient,
table: &str,
items: Vec<Value>,
) -> Result<Vec<Value>> {
if items.is_empty() {
return Ok(Vec::new());
}
validate_identifier(table, "table name")?;
let items_array = format_items_array(&items)?;
let surql = format!("INSERT INTO {table} {items_array};");
let raw = client.query(&surql).await?;
Ok(flatten_rows(&raw))
}
#[derive(Debug, Clone, Default)]
pub struct RelateItem {
pub from: String,
pub to: String,
pub data: Option<serde_json::Map<String, Value>>,
}
impl RelateItem {
pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
Self {
from: from.into(),
to: to.into(),
data: None,
}
}
pub fn with_data(mut self, data: serde_json::Map<String, Value>) -> Self {
self.data = Some(data);
self
}
}
#[cfg(any(feature = "client", feature = "client-rustls"))]
pub async fn relate_many(
client: &DatabaseClient,
from_table: &str,
edge: &str,
to_table: &str,
relations: Vec<RelateItem>,
) -> Result<Vec<Value>> {
if relations.is_empty() {
return Ok(Vec::new());
}
validate_identifier(from_table, "from table name")?;
validate_identifier(edge, "edge table name")?;
validate_identifier(to_table, "to table name")?;
let mut stmts: Vec<String> = Vec::with_capacity(relations.len());
for rel in &relations {
stmts.push(build_relate_query(
&rel.from,
edge,
&rel.to,
rel.data.as_ref(),
)?);
}
let surql = stmts.join("\n");
let raw = client.query(&surql).await?;
Ok(flatten_rows(&raw))
}
#[cfg(any(feature = "client", feature = "client-rustls"))]
pub async fn delete_many(
client: &DatabaseClient,
table: &str,
ids: Vec<String>,
) -> Result<Vec<Value>> {
if ids.is_empty() {
return Ok(Vec::new());
}
validate_identifier(table, "table name")?;
let mut rows: Vec<Value> = Vec::new();
for record_id in ids {
if record_id.contains(':') {
validate_identifier(table_part(&record_id), "record ID table")?;
}
let target = if record_id.contains(':') {
record_id.clone()
} else {
format!("{table}:{record_id}")
};
let surql = format!("DELETE {target} RETURN BEFORE;");
let raw = client.query(&surql).await?;
rows.extend(flatten_rows(&raw));
}
Ok(rows)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn build_upsert_query_renders_array_literal() {
let items = vec![
json!({"id": "user:1", "name": "Alice"}),
json!({"id": "user:2", "name": "Bob"}),
];
let sql = build_upsert_query("user", &items, None).unwrap();
assert!(sql.starts_with("UPSERT INTO user ["));
assert!(sql.contains("id: 'user:1'"));
assert!(sql.contains("name: 'Alice'"));
assert!(sql.ends_with("];"));
}
#[test]
fn build_upsert_query_appends_where_clause_for_conflict_fields() {
let items = vec![json!({"email": "a@x.com", "name": "Alice"})];
let fields = vec!["email".to_string()];
let sql = build_upsert_query("user", &items, Some(&fields)).unwrap();
assert!(sql.contains("WHERE email = $item.email"));
}
#[test]
fn build_upsert_query_returns_empty_string_for_empty_items() {
let sql = build_upsert_query("user", &[], None).unwrap();
assert!(sql.is_empty());
}
#[test]
fn build_upsert_query_rejects_invalid_identifier() {
let items = vec![json!({"bad field": 1})];
let err = build_upsert_query("user", &items, None).unwrap_err();
assert!(matches!(err, SurqlError::Validation { .. }));
}
#[test]
fn build_relate_query_includes_set_clause() {
let mut data = serde_json::Map::new();
data.insert("since".into(), json!("2024-01-01"));
let sql = build_relate_query("person:alice", "knows", "person:bob", Some(&data)).unwrap();
assert_eq!(
sql,
"RELATE person:alice->knows->person:bob SET since = '2024-01-01';"
);
}
#[test]
fn build_relate_query_without_data() {
let sql = build_relate_query("person:alice", "knows", "person:bob", None).unwrap();
assert_eq!(sql, "RELATE person:alice->knows->person:bob;");
}
#[test]
fn build_relate_query_rejects_invalid_edge() {
let err = build_relate_query("person:alice", "bad edge", "person:bob", None).unwrap_err();
assert!(matches!(err, SurqlError::Validation { .. }));
}
#[test]
fn format_item_for_surql_handles_nested_array() {
let item = json!({"tags": ["a", "b"]});
let rendered = format_item_for_surql(&item).unwrap();
assert_eq!(rendered, "{ tags: ['a', 'b'] }");
}
#[test]
fn format_item_for_surql_rejects_non_object() {
let item = json!([1, 2, 3]);
let err = format_item_for_surql(&item).unwrap_err();
assert!(matches!(err, SurqlError::Validation { .. }));
}
}