Skip to main content

SessionLockManager

Struct SessionLockManager 

Source
pub struct SessionLockManager { /* private fields */ }
Expand description

Manages session locks for concurrent message processing.

The lock manager coordinates exclusive access to sessions, ensuring that only one consumer processes messages from a session at a time. It handles lock acquisition, renewal, release, and automatic expiration cleanup.

§Thread Safety

This type is thread-safe and can be shared across async tasks using Arc.

§Example

use queue_runtime::sessions::SessionLockManager;
use queue_runtime::message::SessionId;
use std::time::Duration;

let manager = SessionLockManager::new(Duration::from_secs(30));
let session_id = SessionId::new("order-456".to_string()).unwrap();

// Acquire lock
let lock = manager.acquire_lock(session_id.clone(), "consumer-1".to_string()).await?;
assert_eq!(lock.owner(), "consumer-1");

// Try to acquire same session with different consumer - should fail
let result = manager.try_acquire_lock(session_id.clone(), "consumer-2".to_string()).await;
assert!(result.is_err());

// Release lock
manager.release_lock(&session_id, "consumer-1").await?;

Implementations§

Source§

impl SessionLockManager

Source

pub fn new(default_lock_duration: Duration) -> Self

Create a new session lock manager.

§Arguments
  • default_lock_duration - Default duration for session locks
§Example
use queue_runtime::sessions::SessionLockManager;
use std::time::Duration;

let manager = SessionLockManager::new(Duration::from_secs(60));
Source

pub async fn try_acquire_lock( &self, session_id: SessionId, owner: String, ) -> Result<SessionLock, QueueError>

Try to acquire a lock on a session (non-blocking).

Returns immediately with an error if the session is already locked by another consumer.

§Arguments
  • session_id - The session to lock
  • owner - Identifier of the consumer requesting the lock
§Returns

The acquired lock if successful, or an error if the session is locked

§Errors

Returns QueueError::SessionLocked if the session is already locked by another consumer and the lock has not expired.

Source

pub async fn acquire_lock( &self, session_id: SessionId, owner: String, ) -> Result<SessionLock, QueueError>

Acquire a lock on a session (blocking with timeout).

Waits for the lock to become available if it’s currently held by another consumer, up to the specified timeout.

§Arguments
  • session_id - The session to lock
  • owner - Identifier of the consumer requesting the lock
§Returns

The acquired lock if successful within the timeout period

§Errors

Returns QueueError::SessionLocked if unable to acquire the lock within the timeout period.

Source

pub async fn renew_lock( &self, session_id: &SessionId, owner: &str, extension: Option<Duration>, ) -> Result<SessionLock, QueueError>

Renew an existing session lock.

Extends the lock’s expiration time, allowing the consumer to continue processing messages from the session.

§Arguments
  • session_id - The session whose lock to renew
  • owner - Identifier of the consumer owning the lock
  • extension - How long to extend the lock by (if None, uses default duration)
§Returns

The renewed lock with updated expiration time

§Errors

Returns QueueError::SessionNotFound if no lock exists for the session. Returns QueueError::SessionLocked if the lock is owned by a different consumer.

Source

pub async fn release_lock( &self, session_id: &SessionId, owner: &str, ) -> Result<(), QueueError>

Release a session lock.

Removes the lock, allowing other consumers to acquire it.

§Arguments
  • session_id - The session whose lock to release
  • owner - Identifier of the consumer releasing the lock
§Returns

Ok(()) if the lock was successfully released

§Errors

Returns QueueError::SessionNotFound if no lock exists for the session. Returns QueueError::SessionLocked if the lock is owned by a different consumer.

Source

pub async fn is_locked(&self, session_id: &SessionId) -> bool

Check if a session is currently locked.

§Arguments
  • session_id - The session to check
§Returns

true if the session has a valid (non-expired) lock

Source

pub async fn get_lock(&self, session_id: &SessionId) -> Option<SessionLock>

Get information about a session lock.

§Arguments
  • session_id - The session to query
§Returns

The lock information if it exists and is not expired

Source

pub async fn cleanup_expired_locks(&self) -> usize

Clean up expired locks.

Removes all locks that have passed their expiration time.

§Returns

The number of expired locks that were removed

Source

pub async fn lock_count(&self) -> usize

Get the number of currently held locks (including expired).

§Returns

Total number of locks in the manager

Source

pub async fn active_lock_count(&self) -> usize

Get the number of active (non-expired) locks.

§Returns

Number of locks that have not expired

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more