StateStore

Trait StateStore 

Source
pub trait StateStore {
    // Required methods
    fn save_resume_token<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        collection: &'life1 str,
        token: &'life2 Document,
    ) -> Pin<Box<dyn Future<Output = Result<(), StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn get_resume_token<'life0, 'life1, 'async_trait>(
        &'life0 self,
        collection: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<Option<Document>, StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn delete_resume_token<'life0, 'life1, 'async_trait>(
        &'life0 self,
        collection: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<(), StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn list_resume_tokens<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<HashMap<String, Document>, StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn close<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<(), StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn try_acquire_lock<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        key: &'life1 str,
        owner_id: &'life2 str,
        ttl: Duration,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn refresh_lock<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        key: &'life1 str,
        owner_id: &'life2 str,
        ttl: Duration,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn release_lock<'life0, 'life1, 'life2, 'async_trait>(
        &'life0 self,
        key: &'life1 str,
        owner_id: &'life2 str,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait,
             'life2: 'async_trait;
    fn is_locked<'life0, 'life1, 'async_trait>(
        &'life0 self,
        key: &'life1 str,
    ) -> Pin<Box<dyn Future<Output = Result<bool, StateStoreError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
}
Expand description

Trait for state storage backends.

Implementations should persist resume tokens durably to survive process restarts.

Required Methods§

Source

fn save_resume_token<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, collection: &'life1 str, token: &'life2 Document, ) -> Pin<Box<dyn Future<Output = Result<(), StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Saves a resume token for a collection.

§Errors

Returns an error if the token cannot be saved.

Source

fn get_resume_token<'life0, 'life1, 'async_trait>( &'life0 self, collection: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<Document>, StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Retrieves the resume token for a collection.

Returns None if no token exists for the collection.

§Errors

Returns an error if the token cannot be retrieved.

Source

fn delete_resume_token<'life0, 'life1, 'async_trait>( &'life0 self, collection: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<(), StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Deletes the resume token for a collection.

§Errors

Returns an error if the token cannot be deleted.

Source

fn list_resume_tokens<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<HashMap<String, Document>, StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Lists all resume tokens.

Returns a map of collection names to resume tokens.

§Errors

Returns an error if the tokens cannot be listed.

Source

fn close<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Closes the state store, releasing any resources.

§Errors

Returns an error if the store cannot be closed cleanly.

Source

fn try_acquire_lock<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, key: &'life1 str, owner_id: &'life2 str, ttl: Duration, ) -> Pin<Box<dyn Future<Output = Result<bool, StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Try to acquire a distributed lock with TTL.

This method attempts to acquire an exclusive lock for the given key. The lock is held by the specified owner and will automatically expire after the TTL duration if not refreshed or released.

§Arguments
  • key - Lock identifier (e.g., “rigatoni:lock:mydb:users”)
  • owner_id - Unique identifier for this instance (e.g., UUID, hostname)
  • ttl - Time-to-live for the lock (auto-expires if owner crashes)
§Returns
  • Ok(true) - Lock acquired successfully
  • Ok(false) - Lock already held by another instance
  • Err(_) - Error communicating with state store
§Example
use std::time::Duration;

let acquired = store.try_acquire_lock(
    "rigatoni:lock:mydb:users",
    "instance-abc123",
    Duration::from_secs(30),
).await?;

if acquired {
    println!("Lock acquired, safe to process");
} else {
    println!("Lock held by another instance");
}
Source

fn refresh_lock<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, key: &'life1 str, owner_id: &'life2 str, ttl: Duration, ) -> Pin<Box<dyn Future<Output = Result<bool, StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Refresh an existing lock to extend its TTL.

This method should be called periodically (heartbeat) to prevent the lock from expiring during normal operation. The refresh only succeeds if the lock is still held by the specified owner.

§Arguments
  • key - Lock identifier
  • owner_id - Unique identifier for this instance
  • ttl - New time-to-live for the lock
§Returns
  • Ok(true) - Lock refreshed successfully
  • Ok(false) - Lock not held by this owner (was acquired by someone else or expired)
  • Err(_) - Error communicating with state store
§Example
// Refresh lock every 10 seconds with 30 second TTL
let refreshed = store.refresh_lock(
    "rigatoni:lock:mydb:users",
    "instance-abc123",
    Duration::from_secs(30),
).await?;

if !refreshed {
    // Lost the lock - another instance took over
    panic!("Lost lock for collection");
}
Source

fn release_lock<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, key: &'life1 str, owner_id: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<bool, StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Release a lock.

This method releases the lock if it is held by the specified owner. Should be called during graceful shutdown to allow other instances to take over immediately without waiting for TTL expiry.

§Arguments
  • key - Lock identifier
  • owner_id - Unique identifier for this instance
§Returns
  • Ok(true) - Lock released successfully
  • Ok(false) - Lock not held by this owner (already released or acquired by another)
  • Err(_) - Error communicating with state store
§Example
// Release lock on shutdown
let released = store.release_lock(
    "rigatoni:lock:mydb:users",
    "instance-abc123",
).await?;

if released {
    println!("Lock released, other instances can now acquire");
}
Source

fn is_locked<'life0, 'life1, 'async_trait>( &'life0 self, key: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<bool, StateStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Check if a lock is held (by any instance).

This is a read-only operation to check if a lock exists. Note: The result may be stale by the time it’s used due to distributed system timing.

§Arguments
  • key - Lock identifier
§Returns
  • Ok(true) - Lock is currently held
  • Ok(false) - Lock is not held (available)
  • Err(_) - Error communicating with state store

Implementors§