Skip to main content

ainl_runtime/
graph_cell.rs

1//! Graph memory holder: direct [`GraphMemory`] by default, or `Arc<std::sync::Mutex<GraphMemory>>`
2//! when the `async` feature is enabled (shared with `tokio::task::spawn_blocking` in `run_turn_async`).
3//!
4//! We use [`std::sync::Mutex`] (not `tokio::sync::Mutex`) so [`crate::AinlRuntime::new`] and
5//! [`crate::AinlRuntime::sqlite_store`] can take short locks on **any** thread—including a Tokio
6//! worker running `#[tokio::test]`—without the failure mode of locking a `tokio::sync::Mutex` from
7//! a context where Tokio treats it as blocking the executor. Heavy graph / SQLite work in
8//! `AinlRuntime::run_turn_async` still runs on the blocking pool (`tokio::task::spawn_blocking`).
9
10use std::ops::Deref;
11#[cfg(feature = "async")]
12use std::sync::{Arc, Mutex, MutexGuard};
13
14use ainl_memory::{GraphMemory, RuntimeStateNode, SqliteGraphStore};
15
16/// Borrowed view of the backing SQLite store (see [`crate::AinlRuntime::sqlite_store`]).
17pub struct SqliteStoreRef<'a> {
18    #[cfg(not(feature = "async"))]
19    inner: &'a SqliteGraphStore,
20    #[cfg(feature = "async")]
21    _guard: MutexGuard<'a, GraphMemory>,
22}
23
24impl<'a> SqliteStoreRef<'a> {
25    #[cfg(not(feature = "async"))]
26    pub(crate) fn borrowed(store: &'a SqliteGraphStore) -> Self {
27        Self { inner: store }
28    }
29
30    #[cfg(feature = "async")]
31    pub(crate) fn from_guard(guard: MutexGuard<'a, GraphMemory>) -> Self {
32        Self { _guard: guard }
33    }
34}
35
36impl Deref for SqliteStoreRef<'_> {
37    type Target = SqliteGraphStore;
38
39    fn deref(&self) -> &Self::Target {
40        #[cfg(not(feature = "async"))]
41        {
42            self.inner
43        }
44        #[cfg(feature = "async")]
45        {
46            self._guard.sqlite_store()
47        }
48    }
49}
50
51pub(crate) struct GraphCell {
52    #[cfg(not(feature = "async"))]
53    inner: GraphMemory,
54    #[cfg(feature = "async")]
55    inner: Arc<Mutex<GraphMemory>>,
56}
57
58impl GraphCell {
59    pub(crate) fn new(store: SqliteGraphStore) -> Self {
60        let memory = GraphMemory::from_sqlite_store(store);
61        #[cfg(not(feature = "async"))]
62        {
63            Self { inner: memory }
64        }
65        #[cfg(feature = "async")]
66        {
67            Self {
68                inner: Arc::new(Mutex::new(memory)),
69            }
70        }
71    }
72
73    pub(crate) fn read_runtime_state(
74        &self,
75        agent_id: &str,
76    ) -> Result<Option<RuntimeStateNode>, String> {
77        self.with(|m| m.read_runtime_state(agent_id))
78    }
79
80    #[cfg(not(feature = "async"))]
81    pub(crate) fn with<R, F: FnOnce(&GraphMemory) -> R>(&self, f: F) -> R {
82        f(&self.inner)
83    }
84
85    #[cfg(feature = "async")]
86    pub(crate) fn with<R, F: FnOnce(&GraphMemory) -> R>(&self, f: F) -> R {
87        let g = self.inner.lock().expect("graph mutex poisoned");
88        f(&g)
89    }
90
91    pub(crate) fn sqlite_ref(&self) -> SqliteStoreRef<'_> {
92        #[cfg(not(feature = "async"))]
93        {
94            SqliteStoreRef::borrowed(self.inner.sqlite_store())
95        }
96        #[cfg(feature = "async")]
97        {
98            SqliteStoreRef::from_guard(self.inner.lock().expect("graph mutex"))
99        }
100    }
101
102    #[cfg(feature = "async")]
103    pub(crate) fn shared_arc(&self) -> Arc<Mutex<GraphMemory>> {
104        Arc::clone(&self.inner)
105    }
106}