Skip to main content

hydracache_db/
hooks.rs

1use std::fmt;
2
3use thiserror::Error;
4
5pub const HOOK_SCHEMA_ARTIFACT: &str = "hydracache_hook_schema";
6pub const HOOK_SCHEMA_VERSION: i64 = 1;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum HookDialect {
10    Postgres,
11    MySql,
12    Sqlite,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum HookOperation {
17    Insert,
18    Update,
19    Delete,
20}
21
22impl HookOperation {
23    fn as_str(self) -> &'static str {
24        match self {
25            Self::Insert => "insert",
26            Self::Update => "update",
27            Self::Delete => "delete",
28        }
29    }
30
31    fn sqlite_row_ref(self) -> &'static str {
32        match self {
33            Self::Insert | Self::Update => "NEW",
34            Self::Delete => "OLD",
35        }
36    }
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum HookInvalidationTarget {
41    KeyColumn { column: String },
42    Tag { tag: String },
43    TagColumn { prefix: String, column: String },
44    Entity { entity: String, key_column: String },
45    Collection { collection: String },
46}
47
48impl HookInvalidationTarget {
49    pub fn key_column(column: impl Into<String>) -> Self {
50        Self::KeyColumn {
51            column: column.into(),
52        }
53    }
54
55    pub fn tag(tag: impl Into<String>) -> Self {
56        Self::Tag { tag: tag.into() }
57    }
58
59    pub fn tag_column(prefix: impl Into<String>, column: impl Into<String>) -> Self {
60        Self::TagColumn {
61            prefix: prefix.into(),
62            column: column.into(),
63        }
64    }
65
66    pub fn entity(entity: impl Into<String>, key_column: impl Into<String>) -> Self {
67        Self::Entity {
68            entity: entity.into(),
69            key_column: key_column.into(),
70        }
71    }
72
73    pub fn collection(collection: impl Into<String>) -> Self {
74        Self::Collection {
75            collection: collection.into(),
76        }
77    }
78
79    fn validate(&self) -> Result<(), HookError> {
80        match self {
81            Self::KeyColumn { column } if column.trim().is_empty() => {
82                Err(HookError::MissingColumn("key".to_owned()))
83            }
84            Self::TagColumn { prefix, column } => {
85                if prefix.trim().is_empty() {
86                    return Err(HookError::MissingLiteral("tag prefix".to_owned()));
87                }
88                if column.trim().is_empty() {
89                    return Err(HookError::MissingColumn("tag".to_owned()));
90                }
91                Ok(())
92            }
93            Self::Entity { entity, key_column } => {
94                if entity.trim().is_empty() {
95                    return Err(HookError::MissingLiteral("entity".to_owned()));
96                }
97                if key_column.trim().is_empty() {
98                    return Err(HookError::MissingColumn("entity key".to_owned()));
99                }
100                Ok(())
101            }
102            Self::Tag { tag } if tag.trim().is_empty() => {
103                Err(HookError::MissingLiteral("tag".to_owned()))
104            }
105            Self::Collection { collection } if collection.trim().is_empty() => {
106                Err(HookError::MissingLiteral("collection".to_owned()))
107            }
108            _ => Ok(()),
109        }
110    }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct HookOp {
115    operation: HookOperation,
116    target: HookInvalidationTarget,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct HookSchemaVersion {
121    pub artifact: String,
122    pub version: i64,
123    pub table: String,
124    pub dialect: HookDialect,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct HookPlan {
129    dialect: HookDialect,
130    table: String,
131    namespace: String,
132    ops: Vec<HookOp>,
133}
134
135impl HookPlan {
136    pub fn sqlite(table: &str) -> Self {
137        Self::new(HookDialect::Sqlite, table)
138    }
139
140    pub fn postgres(table: &str) -> Self {
141        Self::new(HookDialect::Postgres, table)
142    }
143
144    pub fn mysql(table: &str) -> Self {
145        Self::new(HookDialect::MySql, table)
146    }
147
148    pub fn new(dialect: HookDialect, table: &str) -> Self {
149        Self {
150            dialect,
151            table: table.to_owned(),
152            namespace: "db".to_owned(),
153            ops: Vec::new(),
154        }
155    }
156
157    pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
158        self.namespace = namespace.into();
159        self
160    }
161
162    pub fn on_insert(self, target: HookInvalidationTarget) -> Self {
163        self.on(HookOperation::Insert, target)
164    }
165
166    pub fn on_update(self, target: HookInvalidationTarget) -> Self {
167        self.on(HookOperation::Update, target)
168    }
169
170    pub fn on_delete(self, target: HookInvalidationTarget) -> Self {
171        self.on(HookOperation::Delete, target)
172    }
173
174    pub fn on(mut self, operation: HookOperation, target: HookInvalidationTarget) -> Self {
175        self.ops.push(HookOp { operation, target });
176        self
177    }
178
179    pub fn schema_version(&self) -> HookSchemaVersion {
180        HookSchemaVersion {
181            artifact: HOOK_SCHEMA_ARTIFACT.to_owned(),
182            version: HOOK_SCHEMA_VERSION,
183            table: self.table.clone(),
184            dialect: self.dialect,
185        }
186    }
187
188    pub fn render_sql(&self) -> Result<String, HookError> {
189        Ok(self.render_statements()?.join("\n\n"))
190    }
191
192    pub fn render_statements(&self) -> Result<Vec<String>, HookError> {
193        self.validate()?;
194        match self.dialect {
195            HookDialect::Sqlite => self.render_sqlite(),
196            HookDialect::Postgres => self.render_postgres(),
197            HookDialect::MySql => self.render_mysql(),
198        }
199    }
200
201    #[cfg(feature = "sqlx-outbox")]
202    pub async fn install_sqlite(&self, pool: &sqlx::SqlitePool) -> crate::Result<()> {
203        for statement in self.render_statements()? {
204            sqlx::query(&statement)
205                .execute(pool)
206                .await
207                .map_err(|error| {
208                    crate::DbCacheError::from(hydracache::CacheError::Backend(format!(
209                        "sqlite hook install error: {error}"
210                    )))
211                })?;
212        }
213        Ok(())
214    }
215
216    fn validate(&self) -> Result<(), HookError> {
217        validate_identifier(&self.table)?;
218        if self.namespace.trim().is_empty() {
219            return Err(HookError::MissingLiteral("namespace".to_owned()));
220        }
221        if self.ops.is_empty() {
222            return Err(HookError::NoOperations);
223        }
224        for op in &self.ops {
225            op.target.validate()?;
226        }
227        Ok(())
228    }
229
230    fn render_sqlite(&self) -> Result<Vec<String>, HookError> {
231        let mut statements = hook_schema_statements_sqlite(&self.table);
232        for operation in [
233            HookOperation::Insert,
234            HookOperation::Update,
235            HookOperation::Delete,
236        ] {
237            let targets: Vec<_> = self
238                .ops
239                .iter()
240                .filter(|op| op.operation == operation)
241                .map(|op| &op.target)
242                .collect();
243            if targets.is_empty() {
244                continue;
245            }
246
247            let trigger_name = format!("hydracache_{}_{}_outbox", self.table, operation.as_str());
248            let mut body = String::new();
249            for target in targets {
250                body.push_str(&render_sqlite_insert(
251                    &self.namespace,
252                    &self.table,
253                    operation,
254                    target,
255                )?);
256                body.push('\n');
257            }
258            statements.push(format!(
259                "create trigger if not exists {trigger_name}\nafter {} on {}\nbegin\n{}end;",
260                operation.as_str(),
261                self.table,
262                body
263            ));
264        }
265        Ok(statements)
266    }
267
268    fn render_postgres(&self) -> Result<Vec<String>, HookError> {
269        let mut statements = hook_schema_statements_postgres(&self.table);
270        for operation in [
271            HookOperation::Insert,
272            HookOperation::Update,
273            HookOperation::Delete,
274        ] {
275            let targets: Vec<_> = self
276                .ops
277                .iter()
278                .filter(|op| op.operation == operation)
279                .map(|op| &op.target)
280                .collect();
281            if targets.is_empty() {
282                continue;
283            }
284            let function_name =
285                format!("hydracache_{}_{}_outbox_fn", self.table, operation.as_str());
286            let trigger_name = format!("hydracache_{}_{}_outbox", self.table, operation.as_str());
287            let mut body = String::new();
288            for target in targets {
289                body.push_str(&render_postgres_insert(
290                    &self.namespace,
291                    &self.table,
292                    operation,
293                    target,
294                )?);
295                body.push('\n');
296            }
297            statements.push(format!(
298                "create or replace function {function_name}() returns trigger as $$\nbegin\n{}return {};\nend;\n$$ language plpgsql;",
299                body,
300                operation.sqlite_row_ref()
301            ));
302            statements.push(format!(
303                "drop trigger if exists {trigger_name} on {};\ncreate trigger {trigger_name}\nafter {} on {}\nfor each row execute function {function_name}();",
304                self.table,
305                operation.as_str(),
306                self.table
307            ));
308        }
309        Ok(statements)
310    }
311
312    fn render_mysql(&self) -> Result<Vec<String>, HookError> {
313        let mut statements = hook_schema_statements_mysql(&self.table);
314        for operation in [
315            HookOperation::Insert,
316            HookOperation::Update,
317            HookOperation::Delete,
318        ] {
319            let targets: Vec<_> = self
320                .ops
321                .iter()
322                .filter(|op| op.operation == operation)
323                .map(|op| &op.target)
324                .collect();
325            if targets.is_empty() {
326                continue;
327            }
328            let trigger_name = format!("hydracache_{}_{}_outbox", self.table, operation.as_str());
329            let mut body = String::new();
330            for target in targets {
331                body.push_str(&render_mysql_insert(
332                    &self.namespace,
333                    &self.table,
334                    operation,
335                    target,
336                )?);
337                body.push('\n');
338            }
339            statements.push(format!(
340                "drop trigger if exists {trigger_name};\ncreate trigger {trigger_name}\nafter {} on {}\nfor each row\nbegin\n{}end;",
341                operation.as_str(),
342                self.table,
343                body
344            ));
345        }
346        Ok(statements)
347    }
348}
349
350#[derive(Debug, Error)]
351pub enum HookError {
352    #[error("hook plan requires at least one operation")]
353    NoOperations,
354    #[error("invalid SQL identifier `{0}`")]
355    InvalidIdentifier(String),
356    #[error("hook target is missing required {0} column")]
357    MissingColumn(String),
358    #[error("hook target is missing required {0}")]
359    MissingLiteral(String),
360}
361
362impl From<HookError> for crate::DbCacheError {
363    fn from(error: HookError) -> Self {
364        hydracache::CacheError::Backend(format!("hydracache hook error: {error}")).into()
365    }
366}
367
368fn hook_schema_statements_sqlite(table: &str) -> Vec<String> {
369    vec![
370        "create table if not exists hydracache_hook_schema (
371    artifact text primary key,
372    version integer not null,
373    table_name text not null,
374    installed_at_ms integer not null
375)"
376        .to_owned(),
377        format!(
378            "insert or replace into hydracache_hook_schema (artifact, version, table_name, installed_at_ms)
379values ('{HOOK_SCHEMA_ARTIFACT}', {HOOK_SCHEMA_VERSION}, '{}', cast(strftime('%s', 'now') as integer) * 1000)",
380            sql_literal(table)
381        ),
382    ]
383}
384
385fn hook_schema_statements_postgres(table: &str) -> Vec<String> {
386    vec![
387        "create table if not exists hydracache_hook_schema (
388    artifact text primary key,
389    version bigint not null,
390    table_name text not null,
391    installed_at_ms bigint not null
392);"
393        .to_owned(),
394        format!(
395            "insert into hydracache_hook_schema (artifact, version, table_name, installed_at_ms)
396values ('{HOOK_SCHEMA_ARTIFACT}', {HOOK_SCHEMA_VERSION}, '{}', (extract(epoch from clock_timestamp()) * 1000)::bigint)
397on conflict (artifact) do update set version = excluded.version, table_name = excluded.table_name, installed_at_ms = excluded.installed_at_ms;",
398            sql_literal(table)
399        ),
400    ]
401}
402
403fn hook_schema_statements_mysql(table: &str) -> Vec<String> {
404    vec![
405        "create table if not exists hydracache_hook_schema (
406    artifact varchar(191) primary key,
407    version bigint not null,
408    table_name varchar(191) not null,
409    installed_at_ms bigint not null
410);"
411        .to_owned(),
412        format!(
413            "insert into hydracache_hook_schema (artifact, version, table_name, installed_at_ms)
414values ('{HOOK_SCHEMA_ARTIFACT}', {HOOK_SCHEMA_VERSION}, '{}', unix_timestamp(current_timestamp(3)) * 1000)
415on duplicate key update version = values(version), table_name = values(table_name), installed_at_ms = values(installed_at_ms);",
416            sql_literal(table)
417        ),
418    ]
419}
420
421fn render_sqlite_insert(
422    namespace: &str,
423    table: &str,
424    operation: HookOperation,
425    target: &HookInvalidationTarget,
426) -> Result<String, HookError> {
427    let row = operation.sqlite_row_ref();
428    let target = SqlTargetExpr::sqlite(row, target)?;
429    let commit = format!(
430        "'hook:{table}:{}:' || {}",
431        operation.as_str(),
432        target.value_expr
433    );
434    let target_hash = target.hash_expr.clone();
435    Ok(format!(
436        "insert or ignore into hydracache_invalidation_outbox (
437    id, namespace, commit_position, target_hash, intent_kind,
438    cache_key, cache_tag, entity_name, collection_name, reason,
439    created_at_ms, available_at_ms
440) values (
441    'hook:{namespace}:' || {commit} || ':' || {target_hash},
442    '{namespace}',
443    {commit},
444    {target_hash},
445    '{}',
446    {},
447    {},
448    {},
449    {},
450    'hydracache hook {table} {}',
451    cast(strftime('%s', 'now') as integer) * 1000,
452    cast(strftime('%s', 'now') as integer) * 1000
453);",
454        target.intent_kind,
455        target.cache_key_expr,
456        target.cache_tag_expr,
457        target.entity_name_expr,
458        target.collection_name_expr,
459        operation.as_str()
460    ))
461}
462
463fn render_postgres_insert(
464    namespace: &str,
465    table: &str,
466    operation: HookOperation,
467    target: &HookInvalidationTarget,
468) -> Result<String, HookError> {
469    let row = operation.sqlite_row_ref();
470    let target = SqlTargetExpr::postgres(row, target)?;
471    let commit = format!(
472        "'hook:{table}:{}:' || {}",
473        operation.as_str(),
474        target.value_expr
475    );
476    Ok(format!(
477        "insert into hydracache_invalidation_outbox (
478    id, namespace, commit_position, target_hash, intent_kind,
479    cache_key, cache_tag, entity_name, collection_name, reason,
480    created_at_ms, available_at_ms
481) values (
482    'hook:{namespace}:' || {commit} || ':' || {hash},
483    '{namespace}',
484    {commit},
485    {hash},
486    '{kind}',
487    {cache_key},
488    {cache_tag},
489    {entity_name},
490    {collection_name},
491    'hydracache hook {table} {op}',
492    (extract(epoch from clock_timestamp()) * 1000)::bigint,
493    (extract(epoch from clock_timestamp()) * 1000)::bigint
494) on conflict (namespace, commit_position, target_hash) do nothing;",
495        hash = target.hash_expr,
496        kind = target.intent_kind,
497        cache_key = target.cache_key_expr,
498        cache_tag = target.cache_tag_expr,
499        entity_name = target.entity_name_expr,
500        collection_name = target.collection_name_expr,
501        op = operation.as_str(),
502    ))
503}
504
505fn render_mysql_insert(
506    namespace: &str,
507    table: &str,
508    operation: HookOperation,
509    target: &HookInvalidationTarget,
510) -> Result<String, HookError> {
511    let row = operation.sqlite_row_ref();
512    let target = SqlTargetExpr::mysql(row, target)?;
513    let commit = format!(
514        "concat('hook:{table}:{}:', {})",
515        operation.as_str(),
516        target.value_expr
517    );
518    Ok(format!(
519        "insert ignore into hydracache_invalidation_outbox (
520    id, namespace, commit_position, target_hash, intent_kind,
521    cache_key, cache_tag, entity_name, collection_name, reason,
522    created_at_ms, available_at_ms
523) values (
524    concat('hook:{namespace}:', {commit}, ':', {hash}),
525    '{namespace}',
526    {commit},
527    {hash},
528    '{kind}',
529    {cache_key},
530    {cache_tag},
531    {entity_name},
532    {collection_name},
533    'hydracache hook {table} {op}',
534    unix_timestamp(current_timestamp(3)) * 1000,
535    unix_timestamp(current_timestamp(3)) * 1000
536);",
537        hash = target.hash_expr,
538        kind = target.intent_kind,
539        cache_key = target.cache_key_expr,
540        cache_tag = target.cache_tag_expr,
541        entity_name = target.entity_name_expr,
542        collection_name = target.collection_name_expr,
543        op = operation.as_str(),
544    ))
545}
546
547struct SqlTargetExpr {
548    intent_kind: &'static str,
549    value_expr: String,
550    hash_expr: String,
551    cache_key_expr: String,
552    cache_tag_expr: String,
553    entity_name_expr: String,
554    collection_name_expr: String,
555}
556
557impl SqlTargetExpr {
558    fn sqlite(row: &str, target: &HookInvalidationTarget) -> Result<Self, HookError> {
559        Self::new(row, target, SqlFlavor::Sqlite)
560    }
561
562    fn postgres(row: &str, target: &HookInvalidationTarget) -> Result<Self, HookError> {
563        Self::new(row, target, SqlFlavor::Postgres)
564    }
565
566    fn mysql(row: &str, target: &HookInvalidationTarget) -> Result<Self, HookError> {
567        Self::new(row, target, SqlFlavor::MySql)
568    }
569
570    fn new(
571        row: &str,
572        target: &HookInvalidationTarget,
573        flavor: SqlFlavor,
574    ) -> Result<Self, HookError> {
575        target.validate()?;
576        let null = "null".to_owned();
577        match target {
578            HookInvalidationTarget::KeyColumn { column } => {
579                validate_identifier(column)?;
580                let value = flavor.column_text(row, column);
581                Ok(Self {
582                    intent_kind: "key",
583                    value_expr: value.clone(),
584                    hash_expr: flavor.concat(&["'key:'", &value]),
585                    cache_key_expr: value,
586                    cache_tag_expr: null.clone(),
587                    entity_name_expr: null.clone(),
588                    collection_name_expr: null,
589                })
590            }
591            HookInvalidationTarget::Tag { tag } => {
592                let value = flavor.literal(tag);
593                Ok(Self {
594                    intent_kind: "tag",
595                    value_expr: value.clone(),
596                    hash_expr: flavor.concat(&["'tag:'", &value]),
597                    cache_key_expr: null.clone(),
598                    cache_tag_expr: value,
599                    entity_name_expr: null.clone(),
600                    collection_name_expr: null,
601                })
602            }
603            HookInvalidationTarget::TagColumn { prefix, column } => {
604                validate_identifier(column)?;
605                let column = flavor.column_text(row, column);
606                let value = flavor.concat(&[&flavor.literal(&format!("{prefix}:")), &column]);
607                Ok(Self {
608                    intent_kind: "tag",
609                    value_expr: value.clone(),
610                    hash_expr: flavor.concat(&["'tag:'", &value]),
611                    cache_key_expr: null.clone(),
612                    cache_tag_expr: value,
613                    entity_name_expr: null.clone(),
614                    collection_name_expr: null,
615                })
616            }
617            HookInvalidationTarget::Entity { entity, key_column } => {
618                validate_identifier(key_column)?;
619                let key = flavor.column_text(row, key_column);
620                let entity_literal = flavor.literal(entity);
621                Ok(Self {
622                    intent_kind: "entity",
623                    value_expr: key.clone(),
624                    hash_expr: flavor.concat(&["'entity:'", &entity_literal, "':'", &key]),
625                    cache_key_expr: key,
626                    cache_tag_expr: null.clone(),
627                    entity_name_expr: entity_literal,
628                    collection_name_expr: null,
629                })
630            }
631            HookInvalidationTarget::Collection { collection } => {
632                let value = flavor.literal(collection);
633                Ok(Self {
634                    intent_kind: "collection",
635                    value_expr: value.clone(),
636                    hash_expr: flavor.concat(&["'collection:'", &value]),
637                    cache_key_expr: null.clone(),
638                    cache_tag_expr: null.clone(),
639                    entity_name_expr: null.clone(),
640                    collection_name_expr: value,
641                })
642            }
643        }
644    }
645}
646
647#[derive(Debug, Clone, Copy)]
648enum SqlFlavor {
649    Sqlite,
650    Postgres,
651    MySql,
652}
653
654impl SqlFlavor {
655    fn literal(self, value: &str) -> String {
656        format!("'{}'", sql_literal(value))
657    }
658
659    fn column_text(self, row: &str, column: &str) -> String {
660        match self {
661            Self::Sqlite | Self::Postgres => format!("cast({row}.{column} as text)"),
662            Self::MySql => format!("cast({row}.{column} as char)"),
663        }
664    }
665
666    fn concat(self, parts: &[&str]) -> String {
667        match self {
668            Self::Sqlite | Self::Postgres => parts.join(" || "),
669            Self::MySql => format!("concat({})", parts.join(", ")),
670        }
671    }
672}
673
674fn validate_identifier(identifier: &str) -> Result<(), HookError> {
675    let valid = !identifier.is_empty()
676        && identifier
677            .chars()
678            .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
679        && identifier
680            .chars()
681            .next()
682            .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_');
683    if valid {
684        Ok(())
685    } else {
686        Err(HookError::InvalidIdentifier(identifier.to_owned()))
687    }
688}
689
690fn sql_literal(value: &str) -> String {
691    value.replace('\'', "''")
692}
693
694impl fmt::Display for HookDialect {
695    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
696        formatter.write_str(match self {
697            Self::Postgres => "postgres",
698            Self::MySql => "mysql",
699            Self::Sqlite => "sqlite",
700        })
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    use super::*;
707
708    #[test]
709    fn schema_version_names_table_and_dialect() {
710        let version = HookPlan::sqlite("users").schema_version();
711
712        assert_eq!(version.artifact, HOOK_SCHEMA_ARTIFACT);
713        assert_eq!(version.version, HOOK_SCHEMA_VERSION);
714        assert_eq!(version.table, "users");
715        assert_eq!(version.dialect, HookDialect::Sqlite);
716    }
717}