use crate::algebra::{AggregateType, CompareOp, Expr, JoinCondition, Operand, Predicate};
use crate::backends::{Backend, QueryPlan};
use crate::schema::{Column, DataType, ResultSet, Row, Schema, Value};
use crate::{RealError, Result};
fn base64_encode(bytes: &[u8]) -> String {
const BASE64_CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut result = String::new();
for chunk in bytes.chunks(3) {
let b1 = chunk[0];
let b2 = chunk.get(1).copied().unwrap_or(0);
let b3 = chunk.get(2).copied().unwrap_or(0);
result.push(BASE64_CHARS[(b1 >> 2) as usize] as char);
result.push(BASE64_CHARS[(((b1 & 0x03) << 4) | (b2 >> 4)) as usize] as char);
result.push(if chunk.len() > 1 {
BASE64_CHARS[(((b2 & 0x0f) << 2) | (b3 >> 6)) as usize] as char
} else {
'='
});
result.push(if chunk.len() > 2 {
BASE64_CHARS[(b3 & 0x3f) as usize] as char
} else {
'='
});
}
result
}
pub struct YottaDBBackend {
globals: std::collections::HashMap<String, GlobalMapping>,
}
#[derive(Debug, Clone)]
pub struct GlobalMapping {
pub global: String,
pub schema: Schema,
pub key_structure: KeyStructure,
}
#[derive(Debug, Clone)]
pub enum KeyStructure {
Simple { id_column: String },
Compound { key_columns: Vec<String> },
}
pub struct YDBConnection {
_placeholder: (),
}
#[derive(Debug, Clone)]
pub struct YDBQuery {
pub m_code: String,
pub result_schema: Schema,
}
impl YottaDBBackend {
pub fn new() -> Self {
Self {
globals: std::collections::HashMap::new(),
}
}
pub fn register_global(&mut self, relation: String, mapping: GlobalMapping) {
self.globals.insert(relation, mapping);
}
fn compile_select(&self, relation: &str, predicate: &Predicate, schema: &Schema) -> Result<String> {
let mapping = self
.globals
.get(relation)
.ok_or_else(|| RealError::Schema(format!("Unknown relation: {}", relation)))?;
let mut m_code = String::new();
m_code.push_str(&format!("; Selection from {}\n", mapping.global));
match &mapping.key_structure {
KeyStructure::Simple { id_column } => {
m_code.push_str(&format!("NEW id,{}\n", schema.columns.iter().map(|c| c.name.as_str()).collect::<Vec<_>>().join(",")));
m_code.push_str(&format!("SET id=\"\"\n"));
m_code.push_str(&format!("FOR SET id=$ORDER({}(id)) QUIT:id=\"\" DO\n", mapping.global));
for col in &schema.columns {
if col.name != *id_column {
m_code.push_str(&format!(". SET {}=$GET({}(id,\"{}\"))\n", col.name, mapping.global, col.name));
}
}
let predicate_code = self.compile_predicate(predicate)?;
m_code.push_str(&format!(". IF {} DO\n", predicate_code));
m_code.push_str(". . ; Emit result\n");
m_code.push_str(&format!(". . WRITE id"));
for col in &schema.columns {
if col.name != *id_column {
m_code.push_str(&format!(",\"|\",{}", col.name));
}
}
m_code.push_str(",!\n");
}
KeyStructure::Compound { key_columns } => {
m_code.push_str("; Compound key traversal\n");
m_code.push_str("; (Implementation similar but with nested $ORDER)\n");
}
}
Ok(m_code)
}
fn compile_predicate(&self, pred: &Predicate) -> Result<String> {
match pred {
Predicate::Compare { left, op, right } => {
let left_code = &left.name;
let op_code = match op {
CompareOp::Eq => "=",
CompareOp::NotEq => "'=",
CompareOp::Lt => "<",
CompareOp::Lte => "<=",
CompareOp::Gt => ">",
CompareOp::Gte => ">=",
};
let right_code = self.value_to_m_code(right);
Ok(format!("{}{}{}",left_code, op_code, right_code))
}
Predicate::And(left, right) => {
Ok(format!(
"({})&({})",
self.compile_predicate(left)?,
self.compile_predicate(right)?
))
}
Predicate::Or(left, right) => {
Ok(format!(
"({})||({})",
self.compile_predicate(left)?,
self.compile_predicate(right)?
))
}
Predicate::Not(inner) => Ok(format!("'({})", self.compile_predicate(inner)?)),
Predicate::In { column, values } => {
let checks = values
.iter()
.map(|v| format!("({}={})", column.name, self.value_to_m_literal(v)))
.collect::<Vec<_>>()
.join("!(");
Ok(format!("({})", checks))
}
Predicate::Like { column, pattern } => {
let m_pattern = pattern.replace('%', "*").replace('_', "?");
Ok(format!("{}?\"{}\"", column.name, m_pattern))
}
Predicate::IsNull(column) => {
Ok(format!("{}=\"\"", column.name))
}
Predicate::Between { column, low, high } => {
Ok(format!(
"({}>={})&({}<={})",
column.name, self.value_to_m_literal(low),
column.name, self.value_to_m_literal(high)
))
}
}
}
fn value_to_m_code(&self, operand: &Operand) -> String {
match operand {
Operand::Column(col) => col.name.clone(),
Operand::Literal(v) => self.value_to_m_literal(v),
}
}
fn value_to_m_literal(&self, value: &Value) -> String {
match value {
Value::String(s) => format!("\"{}\"", s),
Value::Integer(i) => i.to_string(),
Value::Boolean(b) => if *b { "1" } else { "0" }.to_string(),
Value::Float(f) => f.to_string(),
Value::Null => "\"\"".to_string(),
Value::Bytes(b) => format!("\"{}\"", base64_encode(b)),
Value::Timestamp(ts) => ts.to_string(),
Value::Decimal(d) => d.clone(),
Value::Json(j) => format!("\"{}\"", j),
Value::Array(_) => "\"[array]\"".to_string(),
Value::Vector(_) => "\"[vector]\"".to_string(),
}
}
}
impl Default for YottaDBBackend {
fn default() -> Self {
Self::new()
}
}
impl Backend for YottaDBBackend {
type Connection = YDBConnection;
type CompiledQuery = YDBQuery;
fn compile(&self, expr: &Expr) -> Result<Self::CompiledQuery> {
match expr {
Expr::Relation { name, schema } => {
let mapping = self
.globals
.get(name)
.ok_or_else(|| RealError::Schema(format!("Unknown relation: {}", name)))?;
let mut code = format!("; Scan {}\n", mapping.global);
code.push_str(&format!("SET id=\"\"\n"));
code.push_str(&format!("FOR SET id=$ORDER({}(id)) QUIT:id=\"\" DO\n", mapping.global));
for col in &schema.columns {
code.push_str(&format!(". WRITE $GET({}(id,\"{}\")),\"|\"\n", mapping.global, col.name));
}
code.push_str(". WRITE !\n");
Ok(YDBQuery {
m_code: code,
result_schema: schema.clone(),
})
}
Expr::Select { input, predicate } => {
if let Expr::Relation { name, schema } = &**input {
let m_code = self.compile_select(name, predicate, schema)?;
Ok(YDBQuery {
m_code,
result_schema: schema.clone(),
})
} else {
Err(RealError::Backend(
"YottaDB backend currently supports selection on base relations only".into(),
))
}
}
Expr::Project { input, columns } => {
let inner = self.compile(input)?;
let output_schema = expr.infer_schema();
Ok(YDBQuery {
m_code: format!(
"; Projection\n{}\n; (Filter columns: {})",
inner.m_code,
columns.join(", ")
),
result_schema: output_schema,
})
}
Expr::Join { left, right, condition } => {
if let (Expr::Relation { name: left_name, schema: left_schema },
Expr::Relation { name: right_name, schema: right_schema }) = (&**left, &**right) {
let left_mapping = self.globals.get(left_name)
.ok_or_else(|| RealError::Schema(format!("Unknown relation: {}", left_name)))?;
let right_mapping = self.globals.get(right_name)
.ok_or_else(|| RealError::Schema(format!("Unknown relation: {}", right_name)))?;
let mut m_code = String::new();
m_code.push_str(&format!("; Join {} with {}\n", left_mapping.global, right_mapping.global));
m_code.push_str("NEW lid,rid\n");
m_code.push_str("SET lid=\"\"\n");
m_code.push_str(&format!("FOR SET lid=$ORDER({}(lid)) QUIT:lid=\"\" DO\n", left_mapping.global));
m_code.push_str(". SET rid=\"\"\n");
m_code.push_str(&format!(". FOR SET rid=$ORDER({}(rid)) QUIT:rid=\"\" DO\n", right_mapping.global));
m_code.push_str(". . ; Join condition check here\n");
m_code.push_str(". . WRITE lid,\"|\",rid,!\n");
Ok(YDBQuery {
m_code,
result_schema: Schema::new("join_result"),
})
} else {
Err(RealError::Backend(
"YottaDB join requires base relations".into()
))
}
}
Expr::Aggregate { input, group_by, aggregates } => {
let mut m_code = String::new();
m_code.push_str("; Aggregation\n");
m_code.push_str("NEW id,grp,val\n");
if let Expr::Relation { name, schema } = &**input {
let mapping = self.globals.get(name)
.ok_or_else(|| RealError::Schema(format!("Unknown relation: {}", name)))?;
m_code.push_str("SET id=\"\"\n");
m_code.push_str(&format!("FOR SET id=$ORDER({}(id)) QUIT:id=\"\" DO\n", mapping.global));
if group_by.is_empty() {
for agg in aggregates {
m_code.push_str(&format!(". SET val=$GET({}(id,\"{}\"))\n", mapping.global, agg.input));
match agg.func {
AggregateType::Count => m_code.push_str(&format!(". SET {}=$GET({})+1\n", agg.name, agg.name)),
AggregateType::Sum => m_code.push_str(&format!(". SET {}=$GET({})+val\n", agg.name, agg.name)),
AggregateType::Avg => {
m_code.push_str(&format!(". SET {}=$GET({})+val\n", agg.name, agg.name));
m_code.push_str(&format!(". SET {}count=$GET({}count)+1\n", agg.name, agg.name));
}
AggregateType::Min => m_code.push_str(&format!(". SET {}=$SELECT($GET({})=\"\":val,val<$GET({}):val,1:$GET({}))\n", agg.name, agg.name, agg.name, agg.name)),
AggregateType::Max => m_code.push_str(&format!(". SET {}=$SELECT($GET({})=\"\":val,val>$GET({}):val,1:$GET({}))\n", agg.name, agg.name, agg.name, agg.name)),
}
}
} else {
m_code.push_str(". ; Group by aggregation\n");
for grp_col in group_by {
m_code.push_str(&format!(". SET grp=$GET({}(id,\"{}\"))\n", mapping.global, grp_col));
}
}
m_code.push_str("; Output aggregation results\n");
for agg in aggregates {
match agg.func {
AggregateType::Avg => {
m_code.push_str(&format!("WRITE {}/{}count,\"|\"\n", agg.name, agg.name));
}
_ => {
m_code.push_str(&format!("WRITE {},\"|\"\n", agg.name));
}
}
}
m_code.push_str("WRITE !\n");
Ok(YDBQuery {
m_code,
result_schema: expr.infer_schema(),
})
} else {
Err(RealError::Backend(
"YottaDB aggregation requires base relation".into()
))
}
}
Expr::Union { left, right } => {
let left_query = self.compile(left)?;
let right_query = self.compile(right)?;
let m_code = format!(
"; Union\n{}\n{}\n; (Deduplicate results)",
left_query.m_code,
right_query.m_code
);
Ok(YDBQuery {
m_code,
result_schema: left_query.result_schema,
})
}
Expr::Intersect { left, right } => {
let left_query = self.compile(left)?;
let right_query = self.compile(right)?;
let m_code = format!(
"; Intersect\n{}\n{}\n; (Find common results)",
left_query.m_code,
right_query.m_code
);
Ok(YDBQuery {
m_code,
result_schema: left_query.result_schema,
})
}
Expr::Difference { left, right } => {
let left_query = self.compile(left)?;
let right_query = self.compile(right)?;
let m_code = format!(
"; Difference\n{}\n{}\n; (Left minus right)",
left_query.m_code,
right_query.m_code
);
Ok(YDBQuery {
m_code,
result_schema: left_query.result_schema,
})
}
Expr::Rename { input, from, to } => {
let inner = self.compile(input)?;
let m_code = format!(
"; Rename {} to {}\n{}",
from, to, inner.m_code
);
Ok(YDBQuery {
m_code,
result_schema: expr.infer_schema(),
})
}
Expr::Sort { input, columns } => {
let inner = self.compile(input)?;
let sort_spec = columns
.iter()
.map(|(col, order)| {
let order_str = match order {
crate::algebra::SortOrder::Asc => "ASC",
crate::algebra::SortOrder::Desc => "DESC",
};
format!("{} {}", col, order_str)
})
.collect::<Vec<_>>()
.join(", ");
let m_code = format!(
"; Sort by {}\n{}\n; (In-memory sort)",
sort_spec, inner.m_code
);
Ok(YDBQuery {
m_code,
result_schema: inner.result_schema,
})
}
Expr::Limit { input, count } => {
let inner = self.compile(input)?;
let m_code = format!(
"; Limit {}\n{}\n; (Take first {} results)",
count, inner.m_code, count
);
Ok(YDBQuery {
m_code,
result_schema: inner.result_schema,
})
}
Expr::Offset { input, count } => {
let inner = self.compile(input)?;
let m_code = format!(
"; Offset {}\n{}\n; (Skip first {} results)",
count, inner.m_code, count
);
Ok(YDBQuery {
m_code,
result_schema: inner.result_schema,
})
}
_ => {
Err(RealError::Backend(format!(
"YottaDB backend does not yet support: {:?}",
expr
)))
}
}
}
fn execute(&self, _conn: &mut Self::Connection, query: &Self::CompiledQuery) -> Result<ResultSet> {
println!("Would execute M code:\n{}", query.m_code);
Ok(Vec::new())
}
fn get_schema(&self, _conn: &mut Self::Connection, relation: &str) -> Result<Schema> {
self.globals
.get(relation)
.map(|m| m.schema.clone())
.ok_or_else(|| RealError::Schema(format!("Unknown relation: {}", relation)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::algebra::{ColumnRef, Predicate, CompareOp, Operand};
#[test]
fn test_yottadb_compile_relation() {
let mut backend = YottaDBBackend::new();
let schema = Schema::new("users")
.with_column("id", DataType::Integer)
.with_column("name", DataType::String)
.with_column("age", DataType::Integer);
backend.register_global(
"users".to_string(),
GlobalMapping {
global: "^Users".to_string(),
schema: schema.clone(),
key_structure: KeyStructure::Simple {
id_column: "id".to_string(),
},
},
);
let expr = Expr::relation("users", schema);
let query = backend.compile(&expr).unwrap();
assert!(query.m_code.contains("^Users"));
assert!(query.m_code.contains("$ORDER"));
}
#[test]
fn test_yottadb_compile_select() {
let mut backend = YottaDBBackend::new();
let schema = Schema::new("users")
.with_column("id", DataType::Integer)
.with_column("name", DataType::String)
.with_column("age", DataType::Integer);
backend.register_global(
"users".to_string(),
GlobalMapping {
global: "^Users".to_string(),
schema: schema.clone(),
key_structure: KeyStructure::Simple {
id_column: "id".to_string(),
},
},
);
let expr = Expr::relation("users", schema.clone())
.select(Predicate::Compare {
left: ColumnRef::new("age"),
op: CompareOp::Gt,
right: Operand::Literal(Value::Integer(25)),
});
let query = backend.compile(&expr).unwrap();
assert!(query.m_code.contains("^Users"));
assert!(query.m_code.contains("age>25"));
}
}