1use std::fmt;
2
3use thiserror::Error;
4
5pub const HOOK_SCHEMA_ARTIFACT: &str = "hydracache_hook_schema";
6pub const HOOK_SCHEMA_VERSION: i64 = 1;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum HookDialect {
10 Postgres,
11 MySql,
12 Sqlite,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum HookOperation {
17 Insert,
18 Update,
19 Delete,
20}
21
22impl HookOperation {
23 fn as_str(self) -> &'static str {
24 match self {
25 Self::Insert => "insert",
26 Self::Update => "update",
27 Self::Delete => "delete",
28 }
29 }
30
31 fn sqlite_row_ref(self) -> &'static str {
32 match self {
33 Self::Insert | Self::Update => "NEW",
34 Self::Delete => "OLD",
35 }
36 }
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum HookInvalidationTarget {
41 KeyColumn { column: String },
42 Tag { tag: String },
43 TagColumn { prefix: String, column: String },
44 Entity { entity: String, key_column: String },
45 Collection { collection: String },
46}
47
48impl HookInvalidationTarget {
49 pub fn key_column(column: impl Into<String>) -> Self {
50 Self::KeyColumn {
51 column: column.into(),
52 }
53 }
54
55 pub fn tag(tag: impl Into<String>) -> Self {
56 Self::Tag { tag: tag.into() }
57 }
58
59 pub fn tag_column(prefix: impl Into<String>, column: impl Into<String>) -> Self {
60 Self::TagColumn {
61 prefix: prefix.into(),
62 column: column.into(),
63 }
64 }
65
66 pub fn entity(entity: impl Into<String>, key_column: impl Into<String>) -> Self {
67 Self::Entity {
68 entity: entity.into(),
69 key_column: key_column.into(),
70 }
71 }
72
73 pub fn collection(collection: impl Into<String>) -> Self {
74 Self::Collection {
75 collection: collection.into(),
76 }
77 }
78
79 fn validate(&self) -> Result<(), HookError> {
80 match self {
81 Self::KeyColumn { column } if column.trim().is_empty() => {
82 Err(HookError::MissingColumn("key".to_owned()))
83 }
84 Self::TagColumn { prefix, column } => {
85 if prefix.trim().is_empty() {
86 return Err(HookError::MissingLiteral("tag prefix".to_owned()));
87 }
88 if column.trim().is_empty() {
89 return Err(HookError::MissingColumn("tag".to_owned()));
90 }
91 Ok(())
92 }
93 Self::Entity { entity, key_column } => {
94 if entity.trim().is_empty() {
95 return Err(HookError::MissingLiteral("entity".to_owned()));
96 }
97 if key_column.trim().is_empty() {
98 return Err(HookError::MissingColumn("entity key".to_owned()));
99 }
100 Ok(())
101 }
102 Self::Tag { tag } if tag.trim().is_empty() => {
103 Err(HookError::MissingLiteral("tag".to_owned()))
104 }
105 Self::Collection { collection } if collection.trim().is_empty() => {
106 Err(HookError::MissingLiteral("collection".to_owned()))
107 }
108 _ => Ok(()),
109 }
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct HookOp {
115 operation: HookOperation,
116 target: HookInvalidationTarget,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct HookSchemaVersion {
121 pub artifact: String,
122 pub version: i64,
123 pub table: String,
124 pub dialect: HookDialect,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct HookPlan {
129 dialect: HookDialect,
130 table: String,
131 namespace: String,
132 ops: Vec<HookOp>,
133}
134
135impl HookPlan {
136 pub fn sqlite(table: &str) -> Self {
137 Self::new(HookDialect::Sqlite, table)
138 }
139
140 pub fn postgres(table: &str) -> Self {
141 Self::new(HookDialect::Postgres, table)
142 }
143
144 pub fn mysql(table: &str) -> Self {
145 Self::new(HookDialect::MySql, table)
146 }
147
148 pub fn new(dialect: HookDialect, table: &str) -> Self {
149 Self {
150 dialect,
151 table: table.to_owned(),
152 namespace: "db".to_owned(),
153 ops: Vec::new(),
154 }
155 }
156
157 pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
158 self.namespace = namespace.into();
159 self
160 }
161
162 pub fn on_insert(self, target: HookInvalidationTarget) -> Self {
163 self.on(HookOperation::Insert, target)
164 }
165
166 pub fn on_update(self, target: HookInvalidationTarget) -> Self {
167 self.on(HookOperation::Update, target)
168 }
169
170 pub fn on_delete(self, target: HookInvalidationTarget) -> Self {
171 self.on(HookOperation::Delete, target)
172 }
173
174 pub fn on(mut self, operation: HookOperation, target: HookInvalidationTarget) -> Self {
175 self.ops.push(HookOp { operation, target });
176 self
177 }
178
179 pub fn schema_version(&self) -> HookSchemaVersion {
180 HookSchemaVersion {
181 artifact: HOOK_SCHEMA_ARTIFACT.to_owned(),
182 version: HOOK_SCHEMA_VERSION,
183 table: self.table.clone(),
184 dialect: self.dialect,
185 }
186 }
187
188 pub fn render_sql(&self) -> Result<String, HookError> {
189 Ok(self.render_statements()?.join("\n\n"))
190 }
191
192 pub fn render_statements(&self) -> Result<Vec<String>, HookError> {
193 self.validate()?;
194 match self.dialect {
195 HookDialect::Sqlite => self.render_sqlite(),
196 HookDialect::Postgres => self.render_postgres(),
197 HookDialect::MySql => self.render_mysql(),
198 }
199 }
200
201 #[cfg(feature = "sqlx-outbox")]
202 pub async fn install_sqlite(&self, pool: &sqlx::SqlitePool) -> crate::Result<()> {
203 for statement in self.render_statements()? {
204 sqlx::query(&statement)
205 .execute(pool)
206 .await
207 .map_err(|error| {
208 crate::DbCacheError::from(hydracache::CacheError::Backend(format!(
209 "sqlite hook install error: {error}"
210 )))
211 })?;
212 }
213 Ok(())
214 }
215
216 fn validate(&self) -> Result<(), HookError> {
217 validate_identifier(&self.table)?;
218 if self.namespace.trim().is_empty() {
219 return Err(HookError::MissingLiteral("namespace".to_owned()));
220 }
221 if self.ops.is_empty() {
222 return Err(HookError::NoOperations);
223 }
224 for op in &self.ops {
225 op.target.validate()?;
226 }
227 Ok(())
228 }
229
230 fn render_sqlite(&self) -> Result<Vec<String>, HookError> {
231 let mut statements = hook_schema_statements_sqlite(&self.table);
232 for operation in [
233 HookOperation::Insert,
234 HookOperation::Update,
235 HookOperation::Delete,
236 ] {
237 let targets: Vec<_> = self
238 .ops
239 .iter()
240 .filter(|op| op.operation == operation)
241 .map(|op| &op.target)
242 .collect();
243 if targets.is_empty() {
244 continue;
245 }
246
247 let trigger_name = format!("hydracache_{}_{}_outbox", self.table, operation.as_str());
248 let mut body = String::new();
249 for target in targets {
250 body.push_str(&render_sqlite_insert(
251 &self.namespace,
252 &self.table,
253 operation,
254 target,
255 )?);
256 body.push('\n');
257 }
258 statements.push(format!(
259 "create trigger if not exists {trigger_name}\nafter {} on {}\nbegin\n{}end;",
260 operation.as_str(),
261 self.table,
262 body
263 ));
264 }
265 Ok(statements)
266 }
267
268 fn render_postgres(&self) -> Result<Vec<String>, HookError> {
269 let mut statements = hook_schema_statements_postgres(&self.table);
270 for operation in [
271 HookOperation::Insert,
272 HookOperation::Update,
273 HookOperation::Delete,
274 ] {
275 let targets: Vec<_> = self
276 .ops
277 .iter()
278 .filter(|op| op.operation == operation)
279 .map(|op| &op.target)
280 .collect();
281 if targets.is_empty() {
282 continue;
283 }
284 let function_name =
285 format!("hydracache_{}_{}_outbox_fn", self.table, operation.as_str());
286 let trigger_name = format!("hydracache_{}_{}_outbox", self.table, operation.as_str());
287 let mut body = String::new();
288 for target in targets {
289 body.push_str(&render_postgres_insert(
290 &self.namespace,
291 &self.table,
292 operation,
293 target,
294 )?);
295 body.push('\n');
296 }
297 statements.push(format!(
298 "create or replace function {function_name}() returns trigger as $$\nbegin\n{}return {};\nend;\n$$ language plpgsql;",
299 body,
300 operation.sqlite_row_ref()
301 ));
302 statements.push(format!(
303 "drop trigger if exists {trigger_name} on {};\ncreate trigger {trigger_name}\nafter {} on {}\nfor each row execute function {function_name}();",
304 self.table,
305 operation.as_str(),
306 self.table
307 ));
308 }
309 Ok(statements)
310 }
311
312 fn render_mysql(&self) -> Result<Vec<String>, HookError> {
313 let mut statements = hook_schema_statements_mysql(&self.table);
314 for operation in [
315 HookOperation::Insert,
316 HookOperation::Update,
317 HookOperation::Delete,
318 ] {
319 let targets: Vec<_> = self
320 .ops
321 .iter()
322 .filter(|op| op.operation == operation)
323 .map(|op| &op.target)
324 .collect();
325 if targets.is_empty() {
326 continue;
327 }
328 let trigger_name = format!("hydracache_{}_{}_outbox", self.table, operation.as_str());
329 let mut body = String::new();
330 for target in targets {
331 body.push_str(&render_mysql_insert(
332 &self.namespace,
333 &self.table,
334 operation,
335 target,
336 )?);
337 body.push('\n');
338 }
339 statements.push(format!(
340 "drop trigger if exists {trigger_name};\ncreate trigger {trigger_name}\nafter {} on {}\nfor each row\nbegin\n{}end;",
341 operation.as_str(),
342 self.table,
343 body
344 ));
345 }
346 Ok(statements)
347 }
348}
349
350#[derive(Debug, Error)]
351pub enum HookError {
352 #[error("hook plan requires at least one operation")]
353 NoOperations,
354 #[error("invalid SQL identifier `{0}`")]
355 InvalidIdentifier(String),
356 #[error("hook target is missing required {0} column")]
357 MissingColumn(String),
358 #[error("hook target is missing required {0}")]
359 MissingLiteral(String),
360}
361
362impl From<HookError> for crate::DbCacheError {
363 fn from(error: HookError) -> Self {
364 hydracache::CacheError::Backend(format!("hydracache hook error: {error}")).into()
365 }
366}
367
368fn hook_schema_statements_sqlite(table: &str) -> Vec<String> {
369 vec![
370 "create table if not exists hydracache_hook_schema (
371 artifact text primary key,
372 version integer not null,
373 table_name text not null,
374 installed_at_ms integer not null
375)"
376 .to_owned(),
377 format!(
378 "insert or replace into hydracache_hook_schema (artifact, version, table_name, installed_at_ms)
379values ('{HOOK_SCHEMA_ARTIFACT}', {HOOK_SCHEMA_VERSION}, '{}', cast(strftime('%s', 'now') as integer) * 1000)",
380 sql_literal(table)
381 ),
382 ]
383}
384
385fn hook_schema_statements_postgres(table: &str) -> Vec<String> {
386 vec![
387 "create table if not exists hydracache_hook_schema (
388 artifact text primary key,
389 version bigint not null,
390 table_name text not null,
391 installed_at_ms bigint not null
392);"
393 .to_owned(),
394 format!(
395 "insert into hydracache_hook_schema (artifact, version, table_name, installed_at_ms)
396values ('{HOOK_SCHEMA_ARTIFACT}', {HOOK_SCHEMA_VERSION}, '{}', (extract(epoch from clock_timestamp()) * 1000)::bigint)
397on conflict (artifact) do update set version = excluded.version, table_name = excluded.table_name, installed_at_ms = excluded.installed_at_ms;",
398 sql_literal(table)
399 ),
400 ]
401}
402
403fn hook_schema_statements_mysql(table: &str) -> Vec<String> {
404 vec![
405 "create table if not exists hydracache_hook_schema (
406 artifact varchar(191) primary key,
407 version bigint not null,
408 table_name varchar(191) not null,
409 installed_at_ms bigint not null
410);"
411 .to_owned(),
412 format!(
413 "insert into hydracache_hook_schema (artifact, version, table_name, installed_at_ms)
414values ('{HOOK_SCHEMA_ARTIFACT}', {HOOK_SCHEMA_VERSION}, '{}', unix_timestamp(current_timestamp(3)) * 1000)
415on duplicate key update version = values(version), table_name = values(table_name), installed_at_ms = values(installed_at_ms);",
416 sql_literal(table)
417 ),
418 ]
419}
420
421fn render_sqlite_insert(
422 namespace: &str,
423 table: &str,
424 operation: HookOperation,
425 target: &HookInvalidationTarget,
426) -> Result<String, HookError> {
427 let row = operation.sqlite_row_ref();
428 let target = SqlTargetExpr::sqlite(row, target)?;
429 let commit = format!(
430 "'hook:{table}:{}:' || {}",
431 operation.as_str(),
432 target.value_expr
433 );
434 let target_hash = target.hash_expr.clone();
435 Ok(format!(
436 "insert or ignore into hydracache_invalidation_outbox (
437 id, namespace, commit_position, target_hash, intent_kind,
438 cache_key, cache_tag, entity_name, collection_name, reason,
439 created_at_ms, available_at_ms
440) values (
441 'hook:{namespace}:' || {commit} || ':' || {target_hash},
442 '{namespace}',
443 {commit},
444 {target_hash},
445 '{}',
446 {},
447 {},
448 {},
449 {},
450 'hydracache hook {table} {}',
451 cast(strftime('%s', 'now') as integer) * 1000,
452 cast(strftime('%s', 'now') as integer) * 1000
453);",
454 target.intent_kind,
455 target.cache_key_expr,
456 target.cache_tag_expr,
457 target.entity_name_expr,
458 target.collection_name_expr,
459 operation.as_str()
460 ))
461}
462
463fn render_postgres_insert(
464 namespace: &str,
465 table: &str,
466 operation: HookOperation,
467 target: &HookInvalidationTarget,
468) -> Result<String, HookError> {
469 let row = operation.sqlite_row_ref();
470 let target = SqlTargetExpr::postgres(row, target)?;
471 let commit = format!(
472 "'hook:{table}:{}:' || {}",
473 operation.as_str(),
474 target.value_expr
475 );
476 Ok(format!(
477 "insert into hydracache_invalidation_outbox (
478 id, namespace, commit_position, target_hash, intent_kind,
479 cache_key, cache_tag, entity_name, collection_name, reason,
480 created_at_ms, available_at_ms
481) values (
482 'hook:{namespace}:' || {commit} || ':' || {hash},
483 '{namespace}',
484 {commit},
485 {hash},
486 '{kind}',
487 {cache_key},
488 {cache_tag},
489 {entity_name},
490 {collection_name},
491 'hydracache hook {table} {op}',
492 (extract(epoch from clock_timestamp()) * 1000)::bigint,
493 (extract(epoch from clock_timestamp()) * 1000)::bigint
494) on conflict (namespace, commit_position, target_hash) do nothing;",
495 hash = target.hash_expr,
496 kind = target.intent_kind,
497 cache_key = target.cache_key_expr,
498 cache_tag = target.cache_tag_expr,
499 entity_name = target.entity_name_expr,
500 collection_name = target.collection_name_expr,
501 op = operation.as_str(),
502 ))
503}
504
505fn render_mysql_insert(
506 namespace: &str,
507 table: &str,
508 operation: HookOperation,
509 target: &HookInvalidationTarget,
510) -> Result<String, HookError> {
511 let row = operation.sqlite_row_ref();
512 let target = SqlTargetExpr::mysql(row, target)?;
513 let commit = format!(
514 "concat('hook:{table}:{}:', {})",
515 operation.as_str(),
516 target.value_expr
517 );
518 Ok(format!(
519 "insert ignore into hydracache_invalidation_outbox (
520 id, namespace, commit_position, target_hash, intent_kind,
521 cache_key, cache_tag, entity_name, collection_name, reason,
522 created_at_ms, available_at_ms
523) values (
524 concat('hook:{namespace}:', {commit}, ':', {hash}),
525 '{namespace}',
526 {commit},
527 {hash},
528 '{kind}',
529 {cache_key},
530 {cache_tag},
531 {entity_name},
532 {collection_name},
533 'hydracache hook {table} {op}',
534 unix_timestamp(current_timestamp(3)) * 1000,
535 unix_timestamp(current_timestamp(3)) * 1000
536);",
537 hash = target.hash_expr,
538 kind = target.intent_kind,
539 cache_key = target.cache_key_expr,
540 cache_tag = target.cache_tag_expr,
541 entity_name = target.entity_name_expr,
542 collection_name = target.collection_name_expr,
543 op = operation.as_str(),
544 ))
545}
546
547struct SqlTargetExpr {
548 intent_kind: &'static str,
549 value_expr: String,
550 hash_expr: String,
551 cache_key_expr: String,
552 cache_tag_expr: String,
553 entity_name_expr: String,
554 collection_name_expr: String,
555}
556
557impl SqlTargetExpr {
558 fn sqlite(row: &str, target: &HookInvalidationTarget) -> Result<Self, HookError> {
559 Self::new(row, target, SqlFlavor::Sqlite)
560 }
561
562 fn postgres(row: &str, target: &HookInvalidationTarget) -> Result<Self, HookError> {
563 Self::new(row, target, SqlFlavor::Postgres)
564 }
565
566 fn mysql(row: &str, target: &HookInvalidationTarget) -> Result<Self, HookError> {
567 Self::new(row, target, SqlFlavor::MySql)
568 }
569
570 fn new(
571 row: &str,
572 target: &HookInvalidationTarget,
573 flavor: SqlFlavor,
574 ) -> Result<Self, HookError> {
575 target.validate()?;
576 let null = "null".to_owned();
577 match target {
578 HookInvalidationTarget::KeyColumn { column } => {
579 validate_identifier(column)?;
580 let value = flavor.column_text(row, column);
581 Ok(Self {
582 intent_kind: "key",
583 value_expr: value.clone(),
584 hash_expr: flavor.concat(&["'key:'", &value]),
585 cache_key_expr: value,
586 cache_tag_expr: null.clone(),
587 entity_name_expr: null.clone(),
588 collection_name_expr: null,
589 })
590 }
591 HookInvalidationTarget::Tag { tag } => {
592 let value = flavor.literal(tag);
593 Ok(Self {
594 intent_kind: "tag",
595 value_expr: value.clone(),
596 hash_expr: flavor.concat(&["'tag:'", &value]),
597 cache_key_expr: null.clone(),
598 cache_tag_expr: value,
599 entity_name_expr: null.clone(),
600 collection_name_expr: null,
601 })
602 }
603 HookInvalidationTarget::TagColumn { prefix, column } => {
604 validate_identifier(column)?;
605 let column = flavor.column_text(row, column);
606 let value = flavor.concat(&[&flavor.literal(&format!("{prefix}:")), &column]);
607 Ok(Self {
608 intent_kind: "tag",
609 value_expr: value.clone(),
610 hash_expr: flavor.concat(&["'tag:'", &value]),
611 cache_key_expr: null.clone(),
612 cache_tag_expr: value,
613 entity_name_expr: null.clone(),
614 collection_name_expr: null,
615 })
616 }
617 HookInvalidationTarget::Entity { entity, key_column } => {
618 validate_identifier(key_column)?;
619 let key = flavor.column_text(row, key_column);
620 let entity_literal = flavor.literal(entity);
621 Ok(Self {
622 intent_kind: "entity",
623 value_expr: key.clone(),
624 hash_expr: flavor.concat(&["'entity:'", &entity_literal, "':'", &key]),
625 cache_key_expr: key,
626 cache_tag_expr: null.clone(),
627 entity_name_expr: entity_literal,
628 collection_name_expr: null,
629 })
630 }
631 HookInvalidationTarget::Collection { collection } => {
632 let value = flavor.literal(collection);
633 Ok(Self {
634 intent_kind: "collection",
635 value_expr: value.clone(),
636 hash_expr: flavor.concat(&["'collection:'", &value]),
637 cache_key_expr: null.clone(),
638 cache_tag_expr: null.clone(),
639 entity_name_expr: null.clone(),
640 collection_name_expr: value,
641 })
642 }
643 }
644 }
645}
646
647#[derive(Debug, Clone, Copy)]
648enum SqlFlavor {
649 Sqlite,
650 Postgres,
651 MySql,
652}
653
654impl SqlFlavor {
655 fn literal(self, value: &str) -> String {
656 format!("'{}'", sql_literal(value))
657 }
658
659 fn column_text(self, row: &str, column: &str) -> String {
660 match self {
661 Self::Sqlite | Self::Postgres => format!("cast({row}.{column} as text)"),
662 Self::MySql => format!("cast({row}.{column} as char)"),
663 }
664 }
665
666 fn concat(self, parts: &[&str]) -> String {
667 match self {
668 Self::Sqlite | Self::Postgres => parts.join(" || "),
669 Self::MySql => format!("concat({})", parts.join(", ")),
670 }
671 }
672}
673
674fn validate_identifier(identifier: &str) -> Result<(), HookError> {
675 let valid = !identifier.is_empty()
676 && identifier
677 .chars()
678 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
679 && identifier
680 .chars()
681 .next()
682 .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_');
683 if valid {
684 Ok(())
685 } else {
686 Err(HookError::InvalidIdentifier(identifier.to_owned()))
687 }
688}
689
690fn sql_literal(value: &str) -> String {
691 value.replace('\'', "''")
692}
693
694impl fmt::Display for HookDialect {
695 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
696 formatter.write_str(match self {
697 Self::Postgres => "postgres",
698 Self::MySql => "mysql",
699 Self::Sqlite => "sqlite",
700 })
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707
708 #[test]
709 fn schema_version_names_table_and_dialect() {
710 let version = HookPlan::sqlite("users").schema_version();
711
712 assert_eq!(version.artifact, HOOK_SCHEMA_ARTIFACT);
713 assert_eq!(version.version, HOOK_SCHEMA_VERSION);
714 assert_eq!(version.table, "users");
715 assert_eq!(version.dialect, HookDialect::Sqlite);
716 }
717}