use chrono::Utc;
use sqlx::{
Row, Sqlite,
query::{Query as SqlxQuery, QueryScalar},
sqlite::{SqliteArguments, SqlitePool, SqlitePoolOptions, SqliteRow},
};
use uuid::Uuid;
use crate::{
adapters::{
Adapter, EdgeQuery, EdgeRecord, EdgeTraversal, Error, ObjectRecord, Query,
TraversalDirection, UniqueAdapter,
},
query::{Cursor, IndexValue, IndexValueInner, QueryFilter},
};
pub struct SqliteAdapter {
pub(crate) pool: SqlitePool,
}
impl SqliteAdapter {
pub async fn new_file(path: &str) -> Result<Self, Error> {
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect(&format!("sqlite:{}", path))
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(Self { pool })
}
pub async fn new_memory() -> Result<Self, Error> {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(Self { pool })
}
pub fn from_pool(pool: SqlitePool) -> Self {
Self { pool }
}
pub async fn init_schema(&self) -> Result<(), Error> {
let mut tx = self
.pool
.begin()
.await
.map_err(|err| Error::Storage(err.to_string()))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS objects (
id BLOB PRIMARY KEY,
type TEXT NOT NULL,
owner BLOB NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
data BLOB NOT NULL,
index_meta TEXT NOT NULL
)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_objects_type_owner ON objects(type, owner, id DESC)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_objects_type_owner_created ON objects(type, owner, created_at DESC)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_objects_type_owner_updated ON objects(type, owner, updated_at DESC)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS edges (
"from" BLOB NOT NULL,
"to" BLOB NOT NULL,
type TEXT NOT NULL,
data BLOB NOT NULL,
index_meta TEXT NOT NULL,
PRIMARY KEY ("from", "to", type)
)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_edges_from ON edges("from", type)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_edges_to ON edges("to", type)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS unique_constraints (
id BLOB NOT NULL,
type TEXT NOT NULL,
key TEXT NOT NULL UNIQUE,
field TEXT NOT NULL,
PRIMARY KEY (type, key)
)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_unique_id
ON unique_constraints(id)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_unique_type_key
ON unique_constraints(type, key)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS sequences (
name TEXT PRIMARY KEY,
value INTEGER NOT NULL DEFAULT 1
)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS ousia_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
"#,
)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
tx.commit()
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(())
}
}
impl SqliteAdapter {
fn map_row_to_object_record_slim(row: SqliteRow) -> Result<ObjectRecord, Error> {
let data: Vec<u8> = row
.try_get("data")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let type_name = row
.try_get::<String, _>("type")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let id = row
.try_get::<Uuid, _>("id")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let owner = row
.try_get::<Uuid, _>("owner")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let created_at_str: String = row
.try_get("created_at")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let updated_at_str: String = row
.try_get("updated_at")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
.map_err(|e| Error::Deserialize(e.to_string()))?
.with_timezone(&chrono::Utc);
let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_str)
.map_err(|e| Error::Deserialize(e.to_string()))?
.with_timezone(&chrono::Utc);
Ok(ObjectRecord {
id,
type_name: std::borrow::Cow::Owned(type_name),
owner,
created_at,
updated_at,
data,
index_meta: serde_json::Value::Null,
})
}
fn map_row_to_edge_record(row: SqliteRow) -> Result<EdgeRecord, Error> {
let data: Vec<u8> = row
.try_get("data")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let type_name = row
.try_get::<String, _>("type")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let from = row
.try_get::<Uuid, _>("from")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let to = row
.try_get::<Uuid, _>("to")
.map_err(|e| Error::Deserialize(e.to_string()))?;
Ok(EdgeRecord {
type_name: std::borrow::Cow::Owned(type_name),
from,
to,
data,
index_meta: serde_json::Value::Null,
})
}
fn map_row_to_edge_and_object(row: SqliteRow) -> Result<(EdgeRecord, ObjectRecord), Error> {
let de = |e: sqlx::Error| Error::Deserialize(e.to_string());
let ds = |e: serde_json::Error| Error::Deserialize(e.to_string());
let edge_data: Vec<u8> = row.try_get("edge_data").map_err(de)?;
let obj_data: Vec<u8> = row.try_get("obj_data").map_err(de)?;
let obj_created_str: String = row.try_get("obj_created_at").map_err(de)?;
let obj_updated_str: String = row.try_get("obj_updated_at").map_err(de)?;
let edge = EdgeRecord {
type_name: std::borrow::Cow::Owned(row.try_get::<String, _>("edge_type").map_err(de)?),
from: row.try_get::<Uuid, _>("edge_from").map_err(de)?,
to: row.try_get::<Uuid, _>("edge_to").map_err(de)?,
data: edge_data,
index_meta: serde_json::Value::Null,
};
let obj = ObjectRecord {
id: row.try_get::<Uuid, _>("obj_id").map_err(de)?,
type_name: std::borrow::Cow::Owned(row.try_get::<String, _>("obj_type").map_err(de)?),
owner: row.try_get::<Uuid, _>("obj_owner").map_err(de)?,
created_at: chrono::DateTime::parse_from_rfc3339(&obj_created_str)
.map_err(|e| Error::Deserialize(e.to_string()))?
.with_timezone(&chrono::Utc),
updated_at: chrono::DateTime::parse_from_rfc3339(&obj_updated_str)
.map_err(|e| Error::Deserialize(e.to_string()))?
.with_timezone(&chrono::Utc),
data: obj_data,
index_meta: serde_json::Value::Null,
};
let _ = ds; Ok((edge, obj))
}
async fn query_edges_with_objects_inner(
&self,
edge_type_name: &str,
type_name: &str,
owner: Uuid,
obj_filters: &[QueryFilter],
plan: EdgeQuery,
direction: TraversalDirection,
) -> Result<Vec<(EdgeRecord, ObjectRecord)>, Error> {
let where_clause = Self::build_object_traversal_query_conditions(
direction.clone(),
obj_filters,
&plan.filters,
plan.cursor,
);
let order_clause = Self::build_edge_order_clause(&plan.filters);
let join_col = match direction {
TraversalDirection::Forward => "to",
TraversalDirection::Reverse => "from",
};
let mut sql = format!(
r#"
SELECT
e."from" AS edge_from, e."to" AS edge_to, e.type AS edge_type,
e.data AS edge_data,
o.id AS obj_id, o.type AS obj_type, o.owner AS obj_owner,
o.created_at AS obj_created_at, o.updated_at AS obj_updated_at,
o.data AS obj_data
FROM edges e
JOIN objects o ON e."{join_col}" = o.id
{where_clause}
{order_clause}
"#,
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query(&sql)
.bind(type_name)
.bind(edge_type_name)
.bind(owner);
if let Some(cursor) = plan.cursor {
query = query.bind(cursor.last_id);
}
query = Self::query_bind_filters(query, obj_filters);
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(rows
.into_iter()
.filter_map(|row| Self::map_row_to_edge_and_object(row).ok())
.collect())
}
fn build_filter_condition(alias: &str, filter: &QueryFilter) -> Option<(String, &'static str)> {
let crate::query::QueryMode::Search(ref qs) = filter.mode else {
return None;
};
let col = format!(
"json_extract({}.index_meta, '$.{}')",
alias, filter.field.name
);
let is_array = matches!(filter.value, IndexValue::Array(_));
let condition = match &qs.comparison {
crate::query::Comparison::Equal => format!("{} = ?", col),
crate::query::Comparison::NotEqual => format!("{} != ?", col),
crate::query::Comparison::GreaterThan => format!("{} > ?", col),
crate::query::Comparison::LessThan => format!("{} < ?", col),
crate::query::Comparison::GreaterThanOrEqual => format!("{} >= ?", col),
crate::query::Comparison::LessThanOrEqual => format!("{} <= ?", col),
crate::query::Comparison::BeginsWith => format!("{} LIKE ?", col),
crate::query::Comparison::NotBeginsWith => format!("{} NOT LIKE ?", col),
crate::query::Comparison::Contains | crate::query::Comparison::ContainsAll => {
if is_array {
format!("EXISTS (SELECT 1 FROM json_each({col}) WHERE value IN (SELECT value FROM json_each(?)))")
} else {
format!("{} LIKE ?", col)
}
}
crate::query::Comparison::NotContains | crate::query::Comparison::NotContainsAll => {
if is_array {
format!("NOT EXISTS (SELECT 1 FROM json_each({col}) WHERE value IN (SELECT value FROM json_each(?)))")
} else {
format!("{} NOT LIKE ?", col)
}
}
crate::query::Comparison::NotIn => {
format!("{} NOT IN (SELECT value FROM json_each(?))", col)
}
};
let operator = match qs.operator {
crate::query::Operator::And => "AND",
_ => "OR",
};
Some((condition, operator))
}
fn join_conditions(conditions: &[(String, &str)]) -> String {
let mut out = String::new();
for (i, (cond, op)) in conditions.iter().enumerate() {
out.push_str(cond);
if i < conditions.len() - 1 {
out.push(' ');
out.push_str(op);
out.push(' ');
}
}
out
}
fn build_object_query_conditions(filters: &[QueryFilter], cursor: Option<Cursor>) -> String {
let mut conditions: Vec<(String, &str)> = vec![
("o.type = ?".to_string(), "AND"),
("o.owner = ?".to_string(), "AND"),
];
if cursor.is_some() {
conditions.push(("o.id < ?".to_string(), "AND"));
}
for filter in filters {
if let Some((cond, op)) = Self::build_filter_condition("o", filter) {
conditions.push((cond, op));
}
}
format!("WHERE {}", Self::join_conditions(&conditions))
}
fn build_edge_query_conditions(
filters: &[QueryFilter],
cursor: Option<Cursor>,
direction: TraversalDirection,
) -> String {
let anchor_col = match direction {
TraversalDirection::Forward => r#"e."from""#,
TraversalDirection::Reverse => r#"e."to""#,
};
let cursor_col = match direction {
TraversalDirection::Forward => r#"e."to""#,
TraversalDirection::Reverse => r#"e."from""#,
};
let mut conditions: Vec<(String, &str)> = vec![
("e.type = ?".to_string(), "AND"),
(format!("{} = ?", anchor_col), "AND"),
];
if cursor.is_some() {
conditions.push((format!("{} < ?", cursor_col), "AND"));
}
for filter in filters {
if let Some((cond, op)) = Self::build_filter_condition("e", filter) {
conditions.push((cond, op));
}
}
format!("WHERE {}", Self::join_conditions(&conditions))
}
fn build_order_clause(filters: &[QueryFilter]) -> String {
Self::build_order_clause_aliased(filters, "", false)
}
fn build_edge_order_clause(filters: &[QueryFilter]) -> String {
Self::build_order_clause_aliased(filters, "e", true)
}
fn build_order_clause_aliased(filters: &[QueryFilter], alias: &str, is_edge: bool) -> String {
if filters.iter().any(|f| f.mode.is_random_sort()) {
return "ORDER BY RANDOM()".to_string();
}
let prefix = if alias.is_empty() {
String::new()
} else {
format!("{}.", alias)
};
let sort: Vec<&QueryFilter> = filters
.iter()
.filter(|f| f.mode.as_sort().is_some())
.collect();
if sort.is_empty() {
if is_edge {
return "".to_string();
}
return format!("ORDER BY {}id DESC", prefix);
}
let order_terms: Vec<String> = sort
.iter()
.filter(|s| s.value.as_array().is_none())
.map(|s| {
let dir = if s.mode.as_sort().unwrap().ascending {
"ASC"
} else {
"DESC"
};
if matches!(s.field.name, "created_at" | "updated_at") {
return format!("{}{} {}", prefix, s.field.name, dir);
}
format!(
"json_extract({}index_meta, '$.{}') {}",
prefix, s.field.name, dir
)
})
.collect();
format!("ORDER BY {}", order_terms.join(", "))
}
fn build_object_traversal_query_conditions(
direction: TraversalDirection,
obj_filters: &[QueryFilter],
edge_filters: &[QueryFilter],
cursor: Option<Cursor>,
) -> String {
let mut obj_conditions: Vec<(String, &str)> = vec![("o.type = ?".to_string(), "AND")];
if cursor.is_some() {
obj_conditions.push(("o.id < ?".to_string(), "AND"));
}
for filter in obj_filters {
if let Some((cond, op)) = Self::build_filter_condition("o", filter) {
obj_conditions.push((cond, op));
}
}
let owner_col = match direction {
TraversalDirection::Forward => r#"e."from""#,
TraversalDirection::Reverse => r#"e."to""#,
};
let mut edge_conditions: Vec<(String, &str)> = vec![
("e.type = ?".to_string(), "AND"),
(format!("{} = ?", owner_col), "AND"),
];
for filter in edge_filters {
if let Some((cond, op)) = Self::build_filter_condition("e", filter) {
edge_conditions.push((cond, op));
}
}
let obj_clause = Self::join_conditions(&obj_conditions);
let edge_clause = Self::join_conditions(&edge_conditions);
format!("WHERE {} AND ({})", obj_clause, edge_clause)
}
fn query_bind_filters<'a>(
mut query: SqlxQuery<'a, Sqlite, SqliteArguments<'a>>,
filters: &'a [QueryFilter],
) -> SqlxQuery<'a, Sqlite, SqliteArguments<'a>> {
for filter in filters.iter().filter(|f| f.mode.as_search().is_some()) {
query = match &filter.value {
IndexValue::String(s) => {
use crate::query::Comparison::*;
match filter.mode.as_search().unwrap().comparison {
BeginsWith | NotBeginsWith => query.bind(format!("{}%", s)),
Contains | NotContains => query.bind(format!("%{}%", s)),
_ => query.bind(s),
}
}
IndexValue::Int(i) => query.bind(i),
IndexValue::Float(f) => query.bind(f),
IndexValue::Bool(b) => query.bind(b),
IndexValue::Timestamp(t) => query.bind(t.to_rfc3339()),
IndexValue::Uuid(uid) => query.bind(uid),
IndexValue::Array(arr) => {
if let Some(first) = arr.first() {
match first {
IndexValueInner::String(_) => {
let values: Vec<&str> = arr
.iter()
.map(|s| s.as_string().unwrap_or_default())
.collect();
query.bind(
serde_json::to_string(&values)
.unwrap_or_else(|_| "[]".to_string()),
)
}
IndexValueInner::Int(_) => {
let values: Vec<i64> =
arr.iter().map(|s| s.as_int().unwrap_or_default()).collect();
query.bind(
serde_json::to_string(&values)
.unwrap_or_else(|_| "[]".to_string()),
)
}
IndexValueInner::Float(_) => {
let values: Vec<f64> = arr
.iter()
.map(|s| s.as_float().unwrap_or_default())
.collect();
query.bind(
serde_json::to_string(&values)
.unwrap_or_else(|_| "[]".to_string()),
)
}
}
} else {
query.bind("[]".to_string())
}
}
};
}
query
}
fn query_scalar_bind_filters<'a, O>(
mut query: QueryScalar<'a, Sqlite, O, SqliteArguments<'a>>,
filters: &'a [QueryFilter],
) -> QueryScalar<'a, Sqlite, O, SqliteArguments<'a>> {
for filter in filters.iter().filter(|f| f.mode.as_search().is_some()) {
query = match &filter.value {
IndexValue::String(s) => {
use crate::query::Comparison::*;
match filter.mode.as_search().unwrap().comparison {
BeginsWith | NotBeginsWith => query.bind(format!("{}%", s)),
Contains | NotContains => query.bind(format!("%{}%", s)),
_ => query.bind(s),
}
}
IndexValue::Int(i) => query.bind(i),
IndexValue::Float(f) => query.bind(f),
IndexValue::Bool(b) => query.bind(b),
IndexValue::Timestamp(t) => query.bind(t.to_rfc3339()),
IndexValue::Uuid(uid) => query.bind(uid),
IndexValue::Array(arr) => {
if let Some(first) = arr.first() {
match first {
IndexValueInner::String(_) => {
let values: Vec<&str> = arr
.iter()
.map(|s| s.as_string().unwrap_or_default())
.collect();
query.bind(
serde_json::to_string(&values)
.unwrap_or_else(|_| "[]".to_string()),
)
}
IndexValueInner::Int(_) => {
let values: Vec<i64> =
arr.iter().map(|s| s.as_int().unwrap_or_default()).collect();
query.bind(
serde_json::to_string(&values)
.unwrap_or_else(|_| "[]".to_string()),
)
}
IndexValueInner::Float(_) => {
let values: Vec<f64> = arr
.iter()
.map(|s| s.as_float().unwrap_or_default())
.collect();
query.bind(
serde_json::to_string(&values)
.unwrap_or_else(|_| "[]".to_string()),
)
}
}
} else {
query.bind("[]".to_string())
}
}
};
}
query
}
}
impl SqliteAdapter {
async fn edge_traversal_inner(
&self,
edge_type_name: &str,
type_name: &str,
owner: Uuid,
filters: &[QueryFilter],
plan: EdgeQuery,
direction: TraversalDirection,
) -> Result<Vec<ObjectRecord>, Error> {
let where_clause = Self::build_object_traversal_query_conditions(
direction.clone(),
filters,
&plan.filters,
plan.cursor,
);
let order_clause = Self::build_edge_order_clause(&plan.filters);
let mut sql = format!(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM edges e
LEFT JOIN objects o ON e."{join_col}" = o.id
{where_clause}
{order_clause}
"#,
join_col = match direction {
TraversalDirection::Forward => "to",
TraversalDirection::Reverse => "from",
},
where_clause = where_clause,
order_clause = order_clause,
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query(&sql)
.bind(type_name)
.bind(edge_type_name)
.bind(owner);
if let Some(cursor) = plan.cursor {
query = query.bind(cursor.last_id);
}
query = Self::query_bind_filters(query, filters);
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(rows
.into_iter()
.filter_map(|row| Self::map_row_to_object_record_slim(row).ok())
.collect())
}
fn build_batch_traversal_conditions(
direction: TraversalDirection,
obj_filters: &[QueryFilter],
edge_filters: &[QueryFilter],
n: usize,
) -> String {
let placeholders = std::iter::repeat("?")
.take(n)
.collect::<Vec<_>>()
.join(", ");
let anchor = match direction {
TraversalDirection::Forward => r#"e."from""#,
TraversalDirection::Reverse => r#"e."to""#,
};
let mut obj_conditions: Vec<(String, &str)> = vec![("o.type = ?".to_string(), "AND")];
for f in obj_filters {
if let Some((c, op)) = Self::build_filter_condition("o", f) {
obj_conditions.push((c, op));
}
}
let mut edge_conditions: Vec<(String, &str)> = vec![
("e.type = ?".to_string(), "AND"),
(format!("{} IN ({})", anchor, placeholders), "AND"),
];
for f in edge_filters {
if let Some((c, op)) = Self::build_filter_condition("e", f) {
edge_conditions.push((c, op));
}
}
format!(
"WHERE {} AND ({})",
Self::join_conditions(&obj_conditions),
Self::join_conditions(&edge_conditions)
)
}
fn build_batch_edge_only_conditions(
direction: TraversalDirection,
edge_filters: &[QueryFilter],
n: usize,
) -> String {
let placeholders = std::iter::repeat("?")
.take(n)
.collect::<Vec<_>>()
.join(", ");
let anchor = match direction {
TraversalDirection::Forward => r#"e."from""#,
TraversalDirection::Reverse => r#"e."to""#,
};
let mut conditions: Vec<(String, &str)> = vec![
("e.type = ?".to_string(), "AND"),
(format!("{} IN ({})", anchor, placeholders), "AND"),
];
for f in edge_filters {
if let Some((c, op)) = Self::build_filter_condition("e", f) {
conditions.push((c, op));
}
}
format!("WHERE {}", Self::join_conditions(&conditions))
}
fn build_union_branch_with_obj_conditions(
direction: TraversalDirection,
obj_filters: &[QueryFilter],
edge_filters: &[QueryFilter],
) -> String {
let anchor = match direction {
TraversalDirection::Forward => r#"e."from""#,
TraversalDirection::Reverse => r#"e."to""#,
};
let mut obj_conditions: Vec<(String, &str)> = vec![("o.type = ?".to_string(), "AND")];
for f in obj_filters {
if let Some((c, op)) = Self::build_filter_condition("o", f) {
obj_conditions.push((c, op));
}
}
let mut edge_conditions: Vec<(String, &str)> = vec![
("e.type = ?".to_string(), "AND"),
(format!("{} = ?", anchor), "AND"),
];
for f in edge_filters {
if let Some((c, op)) = Self::build_filter_condition("e", f) {
edge_conditions.push((c, op));
}
}
format!(
"WHERE {} AND ({})",
Self::join_conditions(&obj_conditions),
Self::join_conditions(&edge_conditions)
)
}
fn build_union_branch_edge_only_conditions(
direction: TraversalDirection,
edge_filters: &[QueryFilter],
) -> String {
let anchor = match direction {
TraversalDirection::Forward => r#"e."from""#,
TraversalDirection::Reverse => r#"e."to""#,
};
let mut conditions: Vec<(String, &str)> = vec![
("e.type = ?".to_string(), "AND"),
(format!("{} = ?", anchor), "AND"),
];
for f in edge_filters {
if let Some((c, op)) = Self::build_filter_condition("e", f) {
conditions.push((c, op));
}
}
format!("WHERE {}", Self::join_conditions(&conditions))
}
async fn query_edges_internal(
&self,
type_name: &'static str,
owner: Uuid,
plan: EdgeQuery,
direction: TraversalDirection,
) -> Result<Vec<EdgeRecord>, Error> {
let where_clause = Self::build_edge_query_conditions(&plan.filters, plan.cursor, direction);
let order_clause = Self::build_edge_order_clause(&plan.filters);
let mut sql = format!(
r#"
SELECT e."from" AS "from", e."to" AS "to", e.type AS "type", e.data, e.index_meta
FROM edges e
{}
{}
"#,
where_clause, order_clause
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query(&sql).bind(type_name).bind(owner);
if let Some(cursor) = plan.cursor {
query = query.bind(cursor.last_id);
}
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
rows.into_iter().map(Self::map_row_to_edge_record).collect()
}
}
#[async_trait::async_trait]
impl Adapter for SqliteAdapter {
async fn insert_object(&self, record: ObjectRecord) -> Result<(), Error> {
let ObjectRecord {
id,
type_name,
owner,
created_at,
updated_at,
data,
index_meta,
} = record;
let _ = sqlx::query(
r#"
INSERT INTO objects (id, type, owner, created_at, updated_at, data, index_meta)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(id)
.bind(type_name.as_ref())
.bind(owner)
.bind(created_at.to_rfc3339())
.bind(updated_at.to_rfc3339())
.bind(data.as_slice())
.bind(serde_json::to_string(&index_meta).map_err(|e| Error::Serialize(e.to_string()))?)
.execute(&self.pool)
.await
.map_err(|err| {
if err.to_string().contains("unique") {
Error::UniqueConstraintViolation("id".to_string())
} else {
Error::Storage(err.to_string())
}
})?;
Ok(())
}
async fn fetch_object(
&self,
type_name: &'static str,
id: Uuid,
) -> Result<Option<ObjectRecord>, Error> {
let row = sqlx::query(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
WHERE id = ? AND type = ?
"#,
)
.bind(id)
.bind(type_name)
.fetch_optional(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
match row {
Some(r) => Self::map_row_to_object_record_slim(r).map(Some),
None => Ok(None),
}
}
async fn fetch_bulk_objects(
&self,
type_name: &'static str,
ids: Vec<Uuid>,
) -> Result<Vec<ObjectRecord>, Error> {
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data FROM objects o WHERE id IN ({}) AND type = ?",
placeholders
);
let mut query = sqlx::query(&sql);
for id in ids {
query = query.bind(id);
}
query = query.bind(type_name);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
rows.into_iter()
.map(Self::map_row_to_object_record_slim)
.collect()
}
async fn fetch_objects_batch(
&self,
pairs: Vec<(&'static str, Vec<Uuid>)>,
) -> Result<Vec<ObjectRecord>, Error> {
let pairs: Vec<_> = pairs.into_iter().filter(|(_, ids)| !ids.is_empty()).collect();
if pairs.is_empty() {
return Ok(vec![]);
}
let mut branches = Vec::with_capacity(pairs.len());
for (_, ids) in &pairs {
let placeholders = (0..ids.len()).map(|_| "?").collect::<Vec<_>>().join(",");
branches.push(format!(
"SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data \
FROM objects o WHERE o.type = ? AND o.id IN ({})",
placeholders
));
}
let sql = branches.join("\nUNION ALL\n");
let mut query = sqlx::query(&sql);
for (type_name, ids) in &pairs {
query = query.bind(*type_name);
for id in ids {
query = query.bind(*id);
}
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
rows.into_iter()
.map(Self::map_row_to_object_record_slim)
.collect()
}
async fn update_object(&self, record: ObjectRecord) -> Result<(), Error> {
sqlx::query(
r#"
UPDATE objects
SET updated_at = ?, data = ?, index_meta = ?
WHERE id = ?
"#,
)
.bind(record.updated_at.to_rfc3339())
.bind(record.data.as_slice())
.bind(
serde_json::to_string(&record.index_meta)
.map_err(|e| Error::Serialize(e.to_string()))?,
)
.bind(record.id)
.execute(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(())
}
async fn transfer_object(
&self,
type_name: &'static str,
id: Uuid,
from_owner: Uuid,
to_owner: Uuid,
) -> Result<ObjectRecord, Error> {
let result = sqlx::query(
r#"
UPDATE objects
SET updated_at = ?, owner = ?
WHERE id = ? AND owner = ? AND type = ?
"#,
)
.bind(Utc::now().to_rfc3339())
.bind(to_owner)
.bind(id)
.bind(from_owner)
.bind(type_name)
.execute(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
if result.rows_affected() == 0 {
return Err(Error::NotFound);
}
self.fetch_object(type_name, id)
.await?
.ok_or(Error::NotFound)
}
async fn delete_object(
&self,
type_name: &'static str,
id: Uuid,
owner: Uuid,
) -> Result<Option<ObjectRecord>, Error> {
let record = self.fetch_object(type_name, id).await?;
if let Some(ref rec) = record {
if rec.owner != owner {
return Ok(None);
}
sqlx::query(
r#"
DELETE FROM objects
WHERE id = ? AND owner = ?
"#,
)
.bind(id)
.bind(owner)
.execute(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
}
Ok(record)
}
async fn delete_bulk_objects(
&self,
type_name: &'static str,
ids: Vec<Uuid>,
owner: Uuid,
) -> Result<u64, Error> {
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"DELETE FROM objects WHERE id IN ({}) AND type = ? AND owner = ?",
placeholders
);
let mut query = sqlx::query(&sql);
for id in ids {
query = query.bind(id);
}
query = query.bind(type_name);
let result = query
.bind(owner)
.execute(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(result.rows_affected())
}
async fn delete_owned_objects(
&self,
type_name: &'static str,
owner: Uuid,
) -> Result<u64, Error> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Error::Storage(e.to_string()))?;
sqlx::query(
"DELETE FROM unique_constraints \
WHERE id IN (SELECT id FROM objects WHERE type = ? AND owner = ?)",
)
.bind(type_name)
.bind(owner)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
let result = sqlx::query("DELETE FROM objects WHERE type = ? AND owner = ?")
.bind(type_name)
.bind(owner)
.execute(&mut *tx)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
tx.commit()
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(result.rows_affected())
}
async fn find_object(
&self,
type_name: &'static str,
owner: Uuid,
filters: &[QueryFilter],
) -> Result<Option<ObjectRecord>, Error> {
let where_clause = Self::build_object_query_conditions(filters, None);
let order_clause = Self::build_order_clause(filters);
let sql = format!(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
{}
{}
LIMIT 1
"#,
where_clause, order_clause
);
let mut query = sqlx::query(&sql).bind(type_name).bind(owner);
query = Self::query_bind_filters(query, filters);
let row = query
.fetch_optional(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
match row {
Some(r) => Self::map_row_to_object_record_slim(r).map(Some),
None => Ok(None),
}
}
async fn query_objects(
&self,
type_name: &'static str,
plan: Query,
) -> Result<Vec<ObjectRecord>, Error> {
let mut where_clause = Self::build_object_query_conditions(&plan.filters, plan.cursor);
let order_clause = Self::build_order_clause(&plan.filters);
if plan.owner.is_nil() {
where_clause = where_clause.replace("o.owner = ", "o.owner > ");
}
let mut sql = format!(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
{}
{}
"#,
where_clause, order_clause
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query(&sql).bind(type_name).bind(plan.owner);
if let Some(cursor) = plan.cursor {
query = query.bind(cursor.last_id);
}
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
rows.into_iter()
.map(Self::map_row_to_object_record_slim)
.collect()
}
async fn count_objects(
&self,
type_name: &'static str,
plan: Option<Query>,
) -> Result<u64, Error> {
match plan {
Some(plan) => {
let where_clause = Self::build_object_query_conditions(&plan.filters, None);
let mut sql = format!(
r#"
SELECT COUNT(*) FROM objects o
{}
"#,
where_clause
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query_scalar::<_, i64>(&sql)
.bind(type_name)
.bind(plan.owner);
query = Self::query_scalar_bind_filters(query, &plan.filters);
let count = query
.fetch_one(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(count as u64)
}
None => {
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM objects WHERE type = ?")
.bind(type_name)
.fetch_one(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(count as u64)
}
}
}
async fn fetch_owned_objects_batch(
&self,
type_name: &'static str,
owner_ids: &[Uuid],
) -> Result<Vec<ObjectRecord>, Error> {
if owner_ids.is_empty() {
return Ok(Vec::new());
}
let placeholders = owner_ids.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let sql = format!(
"SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data FROM objects o WHERE type = ? AND owner IN ({})",
placeholders
);
let mut query = sqlx::query(&sql).bind(type_name);
for id in owner_ids {
query = query.bind(*id);
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
rows.into_iter()
.map(Self::map_row_to_object_record_slim)
.collect()
}
async fn fetch_owned_objects(
&self,
type_name: &'static str,
owner: Uuid,
) -> Result<Vec<ObjectRecord>, Error> {
let rows = sqlx::query(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
WHERE owner = ? AND type = ?
"#,
)
.bind(owner)
.bind(type_name)
.fetch_all(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
rows.into_iter()
.map(Self::map_row_to_object_record_slim)
.collect()
}
async fn fetch_owned_object(
&self,
type_name: &'static str,
owner: Uuid,
) -> Result<Option<ObjectRecord>, Error> {
let row = sqlx::query(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
WHERE owner = ? AND type = ?
LIMIT 1
"#,
)
.bind(owner)
.bind(type_name)
.fetch_optional(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
match row {
Some(r) => Self::map_row_to_object_record_slim(r).map(Some),
None => Ok(None),
}
}
async fn fetch_union_object(
&self,
a_type_name: &'static str,
b_type_name: &'static str,
id: Uuid,
) -> Result<Option<ObjectRecord>, Error> {
let row = sqlx::query(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
WHERE id = ? AND (type = ? OR type = ?)
"#,
)
.bind(id)
.bind(a_type_name)
.bind(b_type_name)
.fetch_optional(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
match row {
Some(r) => Self::map_row_to_object_record_slim(r).map(Some),
None => Ok(None),
}
}
async fn fetch_union_objects(
&self,
a_type_name: &'static str,
b_type_name: &'static str,
ids: Vec<Uuid>,
) -> Result<Vec<ObjectRecord>, Error> {
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
WHERE id IN ({}) AND (type = ? OR type = ?)
"#,
placeholders
);
let mut query = sqlx::query(&sql);
for id in ids {
query = query.bind(id);
}
let rows = query
.bind(a_type_name)
.bind(b_type_name)
.fetch_all(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
rows.into_iter()
.map(Self::map_row_to_object_record_slim)
.collect()
}
async fn fetch_owned_union_object(
&self,
a_type_name: &'static str,
b_type_name: &'static str,
owner: Uuid,
) -> Result<Option<ObjectRecord>, Error> {
let row = sqlx::query(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
WHERE owner = ? AND (type = ? OR type = ?)
"#,
)
.bind(owner)
.bind(a_type_name)
.bind(b_type_name)
.fetch_optional(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
match row {
Some(r) => Self::map_row_to_object_record_slim(r).map(Some),
None => Ok(None),
}
}
async fn fetch_owned_union_objects(
&self,
a_type_name: &'static str,
b_type_name: &'static str,
owner: Uuid,
) -> Result<Vec<ObjectRecord>, Error> {
let rows = sqlx::query(
r#"
SELECT o.id, o.type, o.owner, o.created_at, o.updated_at, o.data
FROM objects o
WHERE owner = ? AND (type = ? OR type = ?)
"#,
)
.bind(owner)
.bind(a_type_name)
.bind(b_type_name)
.fetch_all(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
rows.into_iter()
.map(Self::map_row_to_object_record_slim)
.collect()
}
async fn insert_edge(&self, record: EdgeRecord) -> Result<(), Error> {
let EdgeRecord {
from,
to,
type_name,
data,
index_meta,
} = record;
let index_meta_str =
serde_json::to_string(&index_meta).map_err(|e| Error::Serialize(e.to_string()))?;
let _ = sqlx::query(
r#"
INSERT INTO edges ("from", "to", type, data, index_meta)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT ("from", type, "to")
DO UPDATE SET data = ?, index_meta = ?;
"#,
)
.bind(from)
.bind(to)
.bind(type_name.as_ref())
.bind(data.as_slice())
.bind(&index_meta_str)
.bind(data.as_slice())
.bind(&index_meta_str)
.execute(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(())
}
async fn update_edge(
&self,
record: EdgeRecord,
old_to: Uuid,
to: Option<Uuid>,
) -> Result<(), Error> {
let EdgeRecord {
from,
type_name,
data,
..
} = record;
let _ = sqlx::query(
r#"
UPDATE edges SET data = ?, "to" = ?
WHERE "from" = ? AND type = ? AND "to" = ?
"#,
)
.bind(data.as_slice())
.bind(to.unwrap_or(old_to))
.bind(from)
.bind(type_name.as_ref())
.bind(old_to)
.execute(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(())
}
async fn delete_edge(
&self,
type_name: &'static str,
from: Uuid,
to: Uuid,
) -> Result<(), Error> {
let _ = sqlx::query(
r#"
DELETE FROM edges
WHERE type = ? AND "from" = ? AND "to" = ?
"#,
)
.bind(type_name)
.bind(from)
.bind(to)
.execute(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(())
}
async fn delete_object_edge(&self, type_name: &'static str, from: Uuid) -> Result<(), Error> {
let _ = sqlx::query(
r#"
DELETE FROM edges
WHERE type = ? AND "from" = ?
"#,
)
.bind(type_name)
.bind(from.to_string())
.execute(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(())
}
async fn fetch_edge(
&self,
type_name: &'static str,
from: Uuid,
to: Uuid,
) -> Result<Option<EdgeRecord>, Error> {
let row = sqlx::query(
r#"
SELECT e."from", e."to", e.type, e.data
FROM edges e
WHERE type = ? AND "from" = ? AND "to" = ?
"#,
)
.bind(type_name)
.bind(from)
.bind(to)
.fetch_optional(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
let Some(row) = row else {
return Ok(None);
};
Self::map_row_to_edge_record(row).map(|e| Some(e))
}
async fn query_edges(
&self,
type_name: &'static str,
owner: Uuid,
plan: EdgeQuery,
) -> Result<Vec<EdgeRecord>, Error> {
self.query_edges_internal(type_name, owner, plan, TraversalDirection::Forward)
.await
}
async fn query_reverse_edges(
&self,
type_name: &'static str,
owner_reverse: Uuid,
plan: EdgeQuery,
) -> Result<Vec<EdgeRecord>, Error> {
self.query_edges_internal(type_name, owner_reverse, plan, TraversalDirection::Reverse)
.await
}
async fn query_edges_with_targets(
&self,
edge_type: &'static str,
obj_type: &'static str,
owner: Uuid,
obj_filters: &[QueryFilter],
plan: EdgeQuery,
) -> Result<Vec<(EdgeRecord, ObjectRecord)>, Error> {
self.query_edges_with_objects_inner(
edge_type,
obj_type,
owner,
obj_filters,
plan,
TraversalDirection::Forward,
)
.await
}
async fn query_reverse_edges_with_sources(
&self,
edge_type: &'static str,
obj_type: &'static str,
owner: Uuid,
obj_filters: &[QueryFilter],
plan: EdgeQuery,
) -> Result<Vec<(EdgeRecord, ObjectRecord)>, Error> {
self.query_edges_with_objects_inner(
edge_type,
obj_type,
owner,
obj_filters,
plan,
TraversalDirection::Reverse,
)
.await
}
async fn count_edges(
&self,
type_name: &'static str,
owner: Uuid,
plan: Option<EdgeQuery>,
) -> Result<u64, Error> {
match plan {
Some(plan) => {
let where_clause = Self::build_edge_query_conditions(
&plan.filters,
None,
TraversalDirection::Forward,
);
let mut sql = format!(
r#"
SELECT COUNT(*) FROM edges
{}
"#,
where_clause
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query_scalar::<_, i64>(&sql)
.bind(type_name)
.bind(owner);
query = Self::query_scalar_bind_filters(query, &plan.filters);
let count = query
.fetch_one(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(count as u64)
}
None => {
let count: i64 = sqlx::query_scalar(
r#"SELECT COUNT(*) FROM edges WHERE type = ? AND "from" = ?"#,
)
.bind(type_name)
.bind(owner)
.fetch_one(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(count as u64)
}
}
}
async fn count_reverse_edges(
&self,
type_name: &'static str,
to: Uuid,
plan: Option<EdgeQuery>,
) -> Result<u64, Error> {
match plan {
Some(plan) => {
let where_clause = Self::build_edge_query_conditions(
&plan.filters,
None,
TraversalDirection::Reverse,
);
let mut sql = format!(
r#"
SELECT COUNT(*) FROM edges
{}
"#,
where_clause
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query_scalar::<_, i64>(&sql).bind(type_name).bind(to);
query = Self::query_scalar_bind_filters(query, &plan.filters);
let count = query
.fetch_one(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(count as u64)
}
None => {
let count: i64 =
sqlx::query_scalar(r#"SELECT COUNT(*) FROM edges WHERE type = ? AND "to" = ?"#)
.bind(type_name)
.bind(to)
.fetch_one(&self.pool)
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(count as u64)
}
}
}
async fn read_schema_hash(&self, type_name: &'static str) -> Result<Option<String>, Error> {
let key = format!("schema:{}", type_name);
sqlx::query_scalar::<_, String>("SELECT value FROM ousia_meta WHERE key = ?")
.bind(&key)
.fetch_optional(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))
}
async fn upsert_schema_hash(&self, type_name: &'static str, hash: &str) -> Result<(), Error> {
let key = format!("schema:{}", type_name);
sqlx::query(
"INSERT INTO ousia_meta (key, value) VALUES (?, ?) \
ON CONFLICT(key) DO UPDATE SET value = excluded.value",
)
.bind(&key)
.bind(hash)
.execute(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(())
}
async fn sequence_value(&self, sq: String) -> u64 {
let val: i64 =
sqlx::query_scalar("SELECT COALESCE((SELECT value FROM sequences WHERE name = ?), 1)")
.bind(&sq)
.fetch_one(&self.pool)
.await
.expect("Failed to fetch the current sequence value");
val as u64
}
async fn sequence_next_value(&self, sq: String) -> u64 {
let mut tx = self
.pool
.begin()
.await
.expect("Failed to begin transaction");
sqlx::query(
"INSERT INTO sequences (name, value) VALUES (?, 2)
ON CONFLICT (name) DO UPDATE SET value = sequences.value + 1",
)
.bind(&sq)
.execute(&mut *tx)
.await
.expect("Failed to upsert sequence");
let next_val: i64 = sqlx::query_scalar("SELECT value FROM sequences WHERE name = ?")
.bind(&sq)
.fetch_one(&mut *tx)
.await
.expect("Failed to fetch the next sequence value");
tx.commit().await.expect("Failed to commit transaction");
next_val as u64
}
}
#[async_trait::async_trait]
impl UniqueAdapter for SqliteAdapter {
async fn insert_unique_hashes(
&self,
type_name: &str,
object_id: Uuid,
hashes: Vec<(String, &str)>,
) -> Result<(), Error> {
let mut tx = self
.pool
.begin()
.await
.map_err(|err| Error::Storage(err.to_string()))?;
for (hash, field) in hashes {
sqlx::query(
r#"
INSERT INTO unique_constraints (id, type, key, field)
VALUES (?, ?, ?, ?)
"#,
)
.bind(object_id)
.bind(type_name)
.bind(hash)
.bind(&field)
.execute(&mut *tx)
.await
.map_err(|err| {
if err.to_string().contains("unique") {
Error::UniqueConstraintViolation(field.to_string())
} else {
Error::Storage(err.to_string())
}
})?;
}
tx.commit()
.await
.map_err(|err| Error::Storage(err.to_string()))?;
Ok(())
}
async fn delete_unique(&self, hash: &str) -> Result<(), Error> {
sqlx::query(
r#"
DELETE FROM unique_constraints WHERE key = ?
"#,
)
.bind(hash)
.execute(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(())
}
async fn delete_unique_hashes(&self, hashes: Vec<String>) -> Result<(), Error> {
let placeholders = hashes.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"DELETE FROM unique_constraints WHERE key IN ({})",
placeholders
);
let mut query = sqlx::query(&sql);
for id in hashes {
query = query.bind(id);
}
query
.execute(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(())
}
async fn get_hashes_for_object(&self, object_id: Uuid) -> Result<Vec<String>, Error> {
let rows = sqlx::query(
r#"
SELECT key FROM unique_constraints WHERE id = ?
"#,
)
.bind(object_id)
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(rows
.into_iter()
.map(|row| row.try_get("key").unwrap())
.collect())
}
async fn get_hashes_for_objects(&self, object_ids: Vec<Uuid>) -> Result<Vec<String>, Error> {
if object_ids.is_empty() {
return Ok(vec![]);
}
let placeholders = object_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let sql = format!(
"SELECT key FROM unique_constraints WHERE id IN ({})",
placeholders
);
let mut query = sqlx::query(&sql);
for id in &object_ids {
query = query.bind(id);
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(rows
.into_iter()
.map(|row| row.try_get("key").unwrap())
.collect())
}
}
#[async_trait::async_trait]
impl EdgeTraversal for SqliteAdapter {
async fn fetch_object_from_edge_traversal_internal(
&self,
edge_type_name: &str,
type_name: &str,
owner: Uuid,
filters: &[QueryFilter],
plan: EdgeQuery,
) -> Result<Vec<ObjectRecord>, Error> {
self.edge_traversal_inner(
edge_type_name,
type_name,
owner,
filters,
plan,
TraversalDirection::Forward,
)
.await
}
async fn fetch_object_from_edge_reverse_traversal_internal(
&self,
edge_type_name: &str,
type_name: &str,
owner: Uuid,
filters: &[QueryFilter],
plan: EdgeQuery,
) -> Result<Vec<ObjectRecord>, Error> {
self.edge_traversal_inner(
edge_type_name,
type_name,
owner,
filters,
plan,
TraversalDirection::Reverse,
)
.await
}
async fn query_edges_with_targets_batch(
&self,
edge_type: &'static str,
obj_type: &'static str,
from_ids: &[Uuid],
obj_filters: &[QueryFilter],
plan: EdgeQuery,
) -> Result<Vec<(EdgeRecord, ObjectRecord)>, Error> {
if from_ids.is_empty() {
return Ok(Vec::new());
}
let where_clause = Self::build_batch_traversal_conditions(
TraversalDirection::Forward,
obj_filters,
&plan.filters,
from_ids.len(),
);
let order_clause = Self::build_edge_order_clause(&plan.filters);
let mut sql = format!(
r#"
SELECT
e."from" AS edge_from, e."to" AS edge_to, e.type AS edge_type,
e.data AS edge_data,
o.id AS obj_id, o.type AS obj_type, o.owner AS obj_owner,
o.created_at AS obj_created_at, o.updated_at AS obj_updated_at, o.data AS obj_data
FROM edges e
JOIN objects o ON e."to" = o.id
{where_clause}
{order_clause}
"#,
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query(&sql).bind(obj_type).bind(edge_type);
for id in from_ids {
query = query.bind(*id);
}
query = Self::query_bind_filters(query, obj_filters);
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(rows
.into_iter()
.filter_map(|row| Self::map_row_to_edge_and_object(row).ok())
.collect())
}
async fn query_reverse_edges_with_sources_batch(
&self,
edge_type: &'static str,
obj_type: &'static str,
to_ids: &[Uuid],
obj_filters: &[QueryFilter],
plan: EdgeQuery,
) -> Result<Vec<(EdgeRecord, ObjectRecord)>, Error> {
if to_ids.is_empty() {
return Ok(Vec::new());
}
let where_clause = Self::build_batch_traversal_conditions(
TraversalDirection::Reverse,
obj_filters,
&plan.filters,
to_ids.len(),
);
let order_clause = Self::build_edge_order_clause(&plan.filters);
let mut sql = format!(
r#"
SELECT
e."from" AS edge_from, e."to" AS edge_to, e.type AS edge_type,
e.data AS edge_data,
o.id AS obj_id, o.type AS obj_type, o.owner AS obj_owner,
o.created_at AS obj_created_at, o.updated_at AS obj_updated_at, o.data AS obj_data
FROM edges e
JOIN objects o ON e."from" = o.id
{where_clause}
{order_clause}
"#,
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query(&sql).bind(obj_type).bind(edge_type);
for id in to_ids {
query = query.bind(*id);
}
query = Self::query_bind_filters(query, obj_filters);
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(rows
.into_iter()
.filter_map(|row| Self::map_row_to_edge_and_object(row).ok())
.collect())
}
async fn query_edges_batch(
&self,
edge_type: &'static str,
from_ids: &[Uuid],
plan: EdgeQuery,
) -> Result<Vec<EdgeRecord>, Error> {
if from_ids.is_empty() {
return Ok(Vec::new());
}
let where_clause = Self::build_batch_edge_only_conditions(
TraversalDirection::Forward,
&plan.filters,
from_ids.len(),
);
let order_clause = Self::build_edge_order_clause(&plan.filters);
let mut sql = format!(
r#"
SELECT e."from", e."to", e.type, e.data, e.index_meta
FROM edges e
{where_clause}
{order_clause}
"#,
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query(&sql).bind(edge_type);
for id in from_ids {
query = query.bind(*id);
}
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(rows
.into_iter()
.filter_map(|row| Self::map_row_to_edge_record(row).ok())
.collect())
}
async fn query_reverse_edges_batch(
&self,
edge_type: &'static str,
to_ids: &[Uuid],
plan: EdgeQuery,
) -> Result<Vec<EdgeRecord>, Error> {
if to_ids.is_empty() {
return Ok(Vec::new());
}
let where_clause = Self::build_batch_edge_only_conditions(
TraversalDirection::Reverse,
&plan.filters,
to_ids.len(),
);
let order_clause = Self::build_edge_order_clause(&plan.filters);
let mut sql = format!(
r#"
SELECT e."from", e."to", e.type, e.data, e.index_meta
FROM edges e
{where_clause}
{order_clause}
"#,
);
if let Some(limit) = plan.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
let mut query = sqlx::query(&sql).bind(edge_type);
for id in to_ids {
query = query.bind(*id);
}
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(rows
.into_iter()
.filter_map(|row| Self::map_row_to_edge_record(row).ok())
.collect())
}
async fn query_edges_both_directions_with_objects(
&self,
edge_type: &'static str,
obj_type: &'static str,
pivot: Uuid,
obj_filters: &[QueryFilter],
plan: EdgeQuery,
) -> Result<
(
Vec<(EdgeRecord, ObjectRecord)>,
Vec<(EdgeRecord, ObjectRecord)>,
),
Error,
> {
let fwd_where = Self::build_union_branch_with_obj_conditions(
TraversalDirection::Forward,
obj_filters,
&plan.filters,
);
let rev_where = Self::build_union_branch_with_obj_conditions(
TraversalDirection::Reverse,
obj_filters,
&plan.filters,
);
let sel = r#"
SELECT
e."from" AS edge_from, e."to" AS edge_to, e.type AS edge_type,
e.data AS edge_data,
o.id AS obj_id, o.type AS obj_type, o.owner AS obj_owner,
o.created_at AS obj_created_at, o.updated_at AS obj_updated_at, o.data AS obj_data
"#;
let sql = format!(
"{sel} FROM edges e JOIN objects o ON e.\"to\" = o.id {fwd_where}
UNION ALL
{sel} FROM edges e JOIN objects o ON e.\"from\" = o.id {rev_where}",
);
let mut query = sqlx::query(&sql).bind(obj_type).bind(edge_type).bind(pivot);
query = Self::query_bind_filters(query, obj_filters);
query = Self::query_bind_filters(query, &plan.filters);
query = query.bind(obj_type).bind(edge_type).bind(pivot);
query = Self::query_bind_filters(query, obj_filters);
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
let mut fwd: Vec<(EdgeRecord, ObjectRecord)> = Vec::new();
let mut rev: Vec<(EdgeRecord, ObjectRecord)> = Vec::new();
for row in rows {
let edge_from: Uuid = row
.try_get::<Uuid, _>("edge_from")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let pair = Self::map_row_to_edge_and_object(row)?;
if edge_from == pivot {
fwd.push(pair);
} else {
rev.push(pair);
}
}
Ok((fwd, rev))
}
async fn query_edges_both_directions(
&self,
edge_type: &'static str,
pivot: Uuid,
plan: EdgeQuery,
) -> Result<(Vec<EdgeRecord>, Vec<EdgeRecord>), Error> {
let fwd_where = Self::build_union_branch_edge_only_conditions(
TraversalDirection::Forward,
&plan.filters,
);
let rev_where = Self::build_union_branch_edge_only_conditions(
TraversalDirection::Reverse,
&plan.filters,
);
let sql = format!(
r#"SELECT e."from", e."to", e.type, e.data, e.index_meta FROM edges e {fwd_where}
UNION ALL
SELECT e."from", e."to", e.type, e.data, e.index_meta FROM edges e {rev_where}"#,
);
let mut query = sqlx::query(&sql).bind(edge_type).bind(pivot);
query = Self::query_bind_filters(query, &plan.filters);
query = query.bind(edge_type).bind(pivot);
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
let mut fwd: Vec<EdgeRecord> = Vec::new();
let mut rev: Vec<EdgeRecord> = Vec::new();
for row in rows {
let edge_from: Uuid = row
.try_get::<Uuid, _>("from")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let record = Self::map_row_to_edge_record(row)?;
if edge_from == pivot {
fwd.push(record);
} else {
rev.push(record);
}
}
Ok((fwd, rev))
}
async fn count_edges_batch(
&self,
edge_type: &'static str,
from_ids: &[Uuid],
plan: EdgeQuery,
) -> Result<Vec<(Uuid, u64)>, Error> {
if from_ids.is_empty() {
return Ok(Vec::new());
}
let where_clause = Self::build_batch_edge_only_conditions(
TraversalDirection::Forward,
&plan.filters,
from_ids.len(),
);
let sql = format!(
r#"SELECT e."from", COUNT(*) AS cnt FROM edges e {where_clause} GROUP BY e."from""#,
);
let mut query = sqlx::query(&sql).bind(edge_type);
for id in from_ids {
query = query.bind(*id);
}
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
rows.into_iter()
.map(|row| {
let id: Uuid = row
.try_get("from")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let cnt: i64 = row
.try_get("cnt")
.map_err(|e| Error::Deserialize(e.to_string()))?;
Ok((id, cnt as u64))
})
.collect()
}
async fn count_reverse_edges_batch(
&self,
edge_type: &'static str,
to_ids: &[Uuid],
plan: EdgeQuery,
) -> Result<Vec<(Uuid, u64)>, Error> {
if to_ids.is_empty() {
return Ok(Vec::new());
}
let where_clause = Self::build_batch_edge_only_conditions(
TraversalDirection::Reverse,
&plan.filters,
to_ids.len(),
);
let sql = format!(
r#"SELECT e."to", COUNT(*) AS cnt FROM edges e {where_clause} GROUP BY e."to""#,
);
let mut query = sqlx::query(&sql).bind(edge_type);
for id in to_ids {
query = query.bind(*id);
}
query = Self::query_bind_filters(query, &plan.filters);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
rows.into_iter()
.map(|row| {
let id: Uuid = row
.try_get("to")
.map_err(|e| Error::Deserialize(e.to_string()))?;
let cnt: i64 = row
.try_get("cnt")
.map_err(|e| Error::Deserialize(e.to_string()))?;
Ok((id, cnt as u64))
})
.collect()
}
}