cruster 0.0.27

A Rust framework for building distributed, stateful entity systems with durable workflows
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
//! SQL-backed workflow journal storage using PostgreSQL via sqlx.
//!
//! Persists workflow journal entries (idempotency cache) so that
//! workflow execution results and deferred values survive entity
//! restarts and runner crashes.
//!
//! This module is only available when the `sql` feature is enabled.

use async_trait::async_trait;
use chrono::Utc;
use sqlx::postgres::{PgPool, PgQueryResult, PgRow, Postgres};
use sqlx::{Row, Transaction};
use std::any::Any;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::durable::{StorageTransaction, WorkflowStorage};
use crate::error::ClusterError;

/// Compute the exclusive upper bound for a prefix range query.
///
/// Increments the last byte of the prefix string. If the last byte is 0xFF,
/// walks backwards to find a byte that can be incremented. Returns `None` if
/// the prefix is empty or consists entirely of 0xFF bytes (meaning "match
/// everything from prefix onward").
fn increment_last_byte(prefix: &str) -> Option<String> {
    let mut bytes = prefix.as_bytes().to_vec();
    while let Some(&last) = bytes.last() {
        if last < 0xFF {
            *bytes.last_mut().unwrap() = last + 1;
            // If incrementing the last byte produces valid UTF-8, use it directly.
            // Otherwise, truncate to the last valid UTF-8 boundary and retry.
            // This avoids producing invalid UTF-8 strings that PostgreSQL TEXT
            // columns would reject, or replacement characters from lossy conversion
            // that would produce incorrect range bounds.
            match String::from_utf8(bytes.clone()) {
                Ok(s) => return Some(s),
                Err(_) => {
                    // The increment broke a multi-byte UTF-8 sequence.
                    // Pop back to the start of the broken character and retry.
                    bytes.pop();
                    continue;
                }
            }
        }
        bytes.pop();
    }
    None
}

/// PostgreSQL-backed workflow journal storage.
pub struct SqlWorkflowStorage {
    pool: PgPool,
}

impl SqlWorkflowStorage {
    /// Create a new SQL workflow journal storage with the given connection pool.
    ///
    /// Run [`crate::storage::migrate`] before using SQL storage backends.
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }
}

#[async_trait]
impl WorkflowStorage for SqlWorkflowStorage {
    #[tracing::instrument(level = "debug", skip(self))]
    async fn load(&self, key: &str) -> Result<Option<Vec<u8>>, ClusterError> {
        let row = sqlx::query("SELECT value FROM cluster_workflow_journal WHERE key = $1")
            .bind(key)
            .fetch_optional(&self.pool)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("workflow storage load failed: {e}"),
                source: Some(Box::new(e)),
            })?;

        match row {
            Some(r) => {
                let value: Vec<u8> =
                    r.try_get("value")
                        .map_err(|e| ClusterError::PersistenceError {
                            reason: format!(
                                "workflow storage load: failed to read 'value' column: {e}"
                            ),
                            source: Some(Box::new(e)),
                        })?;
                Ok(Some(value))
            }
            None => Ok(None),
        }
    }

    #[tracing::instrument(level = "debug", skip(self, value))]
    async fn save(&self, key: &str, value: &[u8]) -> Result<(), ClusterError> {
        sqlx::query(
            "INSERT INTO cluster_workflow_journal (key, value, updated_at)
             VALUES ($1, $2, NOW())
             ON CONFLICT (key) DO UPDATE SET value = $2, updated_at = NOW()",
        )
        .bind(key)
        .bind(value)
        .execute(&self.pool)
        .await
        .map_err(|e| ClusterError::PersistenceError {
            reason: format!("workflow storage save failed: {e}"),
            source: Some(Box::new(e)),
        })?;

        Ok(())
    }

    #[tracing::instrument(level = "debug", skip(self))]
    async fn delete(&self, key: &str) -> Result<(), ClusterError> {
        sqlx::query("DELETE FROM cluster_workflow_journal WHERE key = $1")
            .bind(key)
            .execute(&self.pool)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("workflow storage delete failed: {e}"),
                source: Some(Box::new(e)),
            })?;

        Ok(())
    }

    #[tracing::instrument(level = "debug", skip(self))]
    async fn mark_completed(&self, key: &str) -> Result<(), ClusterError> {
        sqlx::query("UPDATE cluster_workflow_journal SET completed_at = NOW() WHERE key = $1")
            .bind(key)
            .execute(&self.pool)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("workflow storage mark_completed failed: {e}"),
                source: Some(Box::new(e)),
            })?;
        Ok(())
    }

    #[tracing::instrument(level = "debug", skip(self))]
    async fn cleanup(&self, older_than: std::time::Duration) -> Result<u64, ClusterError> {
        let cutoff =
            Utc::now() - chrono::Duration::from_std(older_than).unwrap_or(chrono::TimeDelta::MAX);
        let result = sqlx::query(
            "DELETE FROM cluster_workflow_journal WHERE completed_at IS NOT NULL AND completed_at < $1",
        )
        .bind(cutoff)
        .execute(&self.pool)
        .await
        .map_err(|e| ClusterError::PersistenceError {
            reason: format!("workflow storage cleanup failed: {e}"),
            source: Some(Box::new(e)),
        })?;
        Ok(result.rows_affected())
    }

    #[tracing::instrument(level = "debug", skip(self))]
    async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, ClusterError> {
        // Use a range query instead of LIKE for reliable btree index usage.
        // LIKE with bind parameters and ESCAPE may not use text_pattern_ops
        // indexes on non-C locale databases. Range queries always use btree.
        // COLLATE "C" forces byte-order comparison, ensuring correct results
        // for non-ASCII keys on databases with non-C default collation.
        let range_end = increment_last_byte(prefix);
        let rows = match &range_end {
            Some(end) => {
                sqlx::query(
                    "SELECT key FROM cluster_workflow_journal WHERE key COLLATE \"C\" >= $1 AND key COLLATE \"C\" < $2 ORDER BY key ASC",
                )
                .bind(prefix)
                .bind(end)
                .fetch_all(&self.pool)
                .await
            }
            None => {
                // Prefix is empty or all 0xFF bytes — match everything from prefix onward
                sqlx::query(
                    "SELECT key FROM cluster_workflow_journal WHERE key COLLATE \"C\" >= $1 ORDER BY key ASC",
                )
                .bind(prefix)
                .fetch_all(&self.pool)
                .await
            }
        }
        .map_err(|e| ClusterError::PersistenceError {
            reason: format!("workflow storage list_keys failed: {e}"),
            source: Some(Box::new(e)),
        })?;

        rows.iter()
            .map(|r| {
                r.try_get("key")
                    .map_err(|e| ClusterError::PersistenceError {
                        reason: format!(
                            "workflow storage list_keys: failed to read 'key' column: {e}"
                        ),
                        source: Some(Box::new(e)),
                    })
            })
            .collect()
    }

    #[tracing::instrument(level = "debug", skip(self))]
    async fn begin_transaction(&self) -> Result<Box<dyn StorageTransaction>, ClusterError> {
        let tx = self
            .pool
            .begin()
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("failed to begin transaction: {e}"),
                source: Some(Box::new(e)),
            })?;

        Ok(Box::new(SqlJournalTransaction {
            tx: Arc::new(Mutex::new(Some(tx))),
        }))
    }

    fn as_arc(&self) -> Arc<dyn WorkflowStorage> {
        // This is only called by the default begin_transaction impl,
        // which we override above. So this should never be called.
        panic!("SqlWorkflowStorage::as_arc() should not be called")
    }

    fn sql_pool(&self) -> Option<&PgPool> {
        Some(&self.pool)
    }
}

/// A PostgreSQL transaction for workflow storage.
///
/// All operations are performed within the transaction and only become
/// visible when `commit()` is called. If the transaction is dropped
/// without committing, all operations are rolled back.
pub struct SqlJournalTransaction {
    tx: Arc<Mutex<Option<Transaction<'static, Postgres>>>>,
}

#[async_trait]
impl StorageTransaction for SqlJournalTransaction {
    async fn save(&mut self, key: &str, value: &[u8]) -> Result<(), ClusterError> {
        let mut guard = self.tx.lock().await;
        let tx = guard
            .as_mut()
            .ok_or_else(|| ClusterError::PersistenceError {
                reason: "transaction already committed or rolled back".to_string(),
                source: None,
            })?;

        sqlx::query(
            "INSERT INTO cluster_workflow_journal (key, value, updated_at)
             VALUES ($1, $2, NOW())
             ON CONFLICT (key) DO UPDATE SET value = $2, updated_at = NOW()",
        )
        .bind(key)
        .bind(value)
        .execute(&mut **tx)
        .await
        .map_err(|e| ClusterError::PersistenceError {
            reason: format!("transaction save failed: {e}"),
            source: Some(Box::new(e)),
        })?;

        Ok(())
    }

    async fn delete(&mut self, key: &str) -> Result<(), ClusterError> {
        let mut guard = self.tx.lock().await;
        let tx = guard
            .as_mut()
            .ok_or_else(|| ClusterError::PersistenceError {
                reason: "transaction already committed or rolled back".to_string(),
                source: None,
            })?;

        sqlx::query("DELETE FROM cluster_workflow_journal WHERE key = $1")
            .bind(key)
            .execute(&mut **tx)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("transaction delete failed: {e}"),
                source: Some(Box::new(e)),
            })?;

        Ok(())
    }

    async fn commit(self: Box<Self>) -> Result<(), ClusterError> {
        let mut guard = self.tx.lock().await;
        let tx = guard.take().ok_or_else(|| ClusterError::PersistenceError {
            reason: "transaction already committed or rolled back".to_string(),
            source: None,
        })?;

        tx.commit()
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("transaction commit failed: {e}"),
                source: Some(Box::new(e)),
            })?;

        Ok(())
    }

    async fn rollback(self: Box<Self>) -> Result<(), ClusterError> {
        let mut guard = self.tx.lock().await;
        let tx = guard.take().ok_or_else(|| ClusterError::PersistenceError {
            reason: "transaction already committed or rolled back".to_string(),
            source: None,
        })?;

        tx.rollback()
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("transaction rollback failed: {e}"),
                source: Some(Box::new(e)),
            })?;

        Ok(())
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}

impl SqlJournalTransaction {
    /// Execute a raw SQL query within this transaction.
    ///
    /// This allows activities to run arbitrary SQL statements that will be
    /// committed or rolled back together with entity state changes.
    ///
    /// # Example
    ///
    /// ```text
    /// #[activity]
    /// async fn transfer(&mut self, to: String, amount: i64) -> Result<(), ClusterError> {
    ///     self.state.balance -= amount;
    ///     
    ///     if let Some(tx) = ActivityScope::sql_transaction().await {
    ///         tx.execute(
    ///             sqlx::query("INSERT INTO transfers (from_id, to_id, amount) VALUES ($1, $2, $3)")
    ///                 .bind(&self.id)
    ///                 .bind(&to)
    ///                 .bind(amount)
    ///         ).await?;
    ///     }
    ///     Ok(())
    /// }
    /// ```
    pub async fn execute(
        &self,
        query: sqlx::query::Query<'_, Postgres, sqlx::postgres::PgArguments>,
    ) -> Result<PgQueryResult, ClusterError> {
        let mut guard = self.tx.lock().await;
        let tx = guard
            .as_mut()
            .ok_or_else(|| ClusterError::PersistenceError {
                reason: "transaction already committed or rolled back".to_string(),
                source: None,
            })?;

        query
            .execute(&mut **tx)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("SQL execute failed: {e}"),
                source: Some(Box::new(e)),
            })
    }

    /// Fetch a single row from a SQL query within this transaction.
    pub async fn fetch_one<'q, O>(
        &self,
        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
    ) -> Result<O, ClusterError>
    where
        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
    {
        let mut guard = self.tx.lock().await;
        let tx = guard
            .as_mut()
            .ok_or_else(|| ClusterError::PersistenceError {
                reason: "transaction already committed or rolled back".to_string(),
                source: None,
            })?;

        query
            .fetch_one(&mut **tx)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("SQL fetch_one failed: {e}"),
                source: Some(Box::new(e)),
            })
    }

    /// Fetch an optional row from a SQL query within this transaction.
    pub async fn fetch_optional<'q, O>(
        &self,
        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
    ) -> Result<Option<O>, ClusterError>
    where
        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
    {
        let mut guard = self.tx.lock().await;
        let tx = guard
            .as_mut()
            .ok_or_else(|| ClusterError::PersistenceError {
                reason: "transaction already committed or rolled back".to_string(),
                source: None,
            })?;

        query
            .fetch_optional(&mut **tx)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("SQL fetch_optional failed: {e}"),
                source: Some(Box::new(e)),
            })
    }

    /// Fetch all rows from a SQL query within this transaction.
    pub async fn fetch_all<'q, O>(
        &self,
        query: sqlx::query::QueryAs<'q, Postgres, O, sqlx::postgres::PgArguments>,
    ) -> Result<Vec<O>, ClusterError>
    where
        O: Send + Unpin + for<'r> sqlx::FromRow<'r, PgRow>,
    {
        let mut guard = self.tx.lock().await;
        let tx = guard
            .as_mut()
            .ok_or_else(|| ClusterError::PersistenceError {
                reason: "transaction already committed or rolled back".to_string(),
                source: None,
            })?;

        query
            .fetch_all(&mut **tx)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: format!("SQL fetch_all failed: {e}"),
                source: Some(Box::new(e)),
            })
    }
}

/// Write a journal entry into an open SQL transaction.
///
/// This is used by the macro-generated activity dispatch code to write
/// the journal entry atomically with activity SQL writes (`self.tx`).
/// Both the journal entry and any user SQL execute in the same transaction.
pub async fn save_journal_entry(
    conn: &mut sqlx::PgConnection,
    key: &str,
    value: &[u8],
) -> Result<(), ClusterError> {
    sqlx::query(
        "INSERT INTO cluster_workflow_journal (key, value, updated_at)
         VALUES ($1, $2, NOW())
         ON CONFLICT (key) DO UPDATE SET value = $2, updated_at = NOW()",
    )
    .bind(key)
    .bind(value)
    .execute(conn)
    .await
    .map_err(|e| ClusterError::PersistenceError {
        reason: format!("journal entry save failed: {e}"),
        source: Some(Box::new(e)),
    })?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn sql_workflow_storage_is_send_sync() {
        fn assert_send_sync<T: Send + Sync>() {}
        assert_send_sync::<SqlWorkflowStorage>();
    }

    #[test]
    fn increment_last_byte_normal() {
        assert_eq!(super::increment_last_byte("abc"), Some("abd".to_string()));
    }

    #[test]
    fn increment_last_byte_trailing_high_byte() {
        // 'z' (0x7A) + 1 = '{' (0x7B)
        assert_eq!(super::increment_last_byte("az"), Some("a{".to_string()));
    }

    #[test]
    fn increment_last_byte_empty() {
        assert_eq!(super::increment_last_byte(""), None);
    }

    #[test]
    fn increment_last_byte_prefix_query_semantics() {
        // Verify that "result/" -> "result0" covers all keys starting with "result/"
        let result = super::increment_last_byte("result/");
        assert_eq!(result, Some("result0".to_string())); // '/' + 1 = '0'
    }
}