Skip to main content

ainl_runtime/
graph_cell.rs

1//! Graph memory holder: direct [`GraphMemory`] by default, or `Arc<std::sync::Mutex<GraphMemory>>`
2//! when the `async` feature is enabled (shared with `tokio::task::spawn_blocking` in `run_turn_async`).
3//!
4//! We use [`std::sync::Mutex`] (not `tokio::sync::Mutex`) so [`crate::AinlRuntime::new`] and
5//! [`crate::AinlRuntime::sqlite_store`] can take short locks on **any** thread—including a Tokio
6//! worker running `#[tokio::test]`—without the failure mode of locking a `tokio::sync::Mutex` from
7//! a context where Tokio treats it as blocking the executor. Heavy graph / SQLite work in
8//! `AinlRuntime::run_turn_async` still runs on the blocking pool (`tokio::task::spawn_blocking`).
9
10use std::ops::Deref;
11#[cfg(feature = "async")]
12use std::sync::{Arc, Mutex, MutexGuard};
13
14use ainl_memory::{GraphMemory, RuntimeStateNode, SqliteGraphStore};
15
16/// Borrowed view of the backing SQLite store (see [`crate::AinlRuntime::sqlite_store`]).
17pub struct SqliteStoreRef<'a> {
18    #[cfg(not(feature = "async"))]
19    inner: &'a SqliteGraphStore,
20    #[cfg(feature = "async")]
21    _guard: MutexGuard<'a, GraphMemory>,
22}
23
24impl<'a> SqliteStoreRef<'a> {
25    #[cfg(not(feature = "async"))]
26    pub(crate) fn borrowed(store: &'a SqliteGraphStore) -> Self {
27        Self { inner: store }
28    }
29
30    #[cfg(feature = "async")]
31    pub(crate) fn from_guard(guard: MutexGuard<'a, GraphMemory>) -> Self {
32        Self { _guard: guard }
33    }
34}
35
36impl Deref for SqliteStoreRef<'_> {
37    type Target = SqliteGraphStore;
38
39    fn deref(&self) -> &Self::Target {
40        #[cfg(not(feature = "async"))]
41        {
42            self.inner
43        }
44        #[cfg(feature = "async")]
45        {
46            self._guard.sqlite_store()
47        }
48    }
49}
50
51pub(crate) struct GraphCell {
52    #[cfg(not(feature = "async"))]
53    inner: GraphMemory,
54    #[cfg(feature = "async")]
55    inner: Arc<Mutex<GraphMemory>>,
56}
57
58impl GraphCell {
59    pub(crate) fn new(store: SqliteGraphStore) -> Self {
60        let memory = GraphMemory::from_sqlite_store(store);
61        #[cfg(not(feature = "async"))]
62        {
63            Self { inner: memory }
64        }
65        #[cfg(feature = "async")]
66        {
67            Self {
68                inner: Arc::new(Mutex::new(memory)),
69            }
70        }
71    }
72
73    pub(crate) fn read_runtime_state(&self, agent_id: &str) -> Result<Option<RuntimeStateNode>, String> {
74        self.with(|m| m.read_runtime_state(agent_id))
75    }
76
77    #[cfg(not(feature = "async"))]
78    pub(crate) fn with<R, F: FnOnce(&GraphMemory) -> R>(&self, f: F) -> R {
79        f(&self.inner)
80    }
81
82    #[cfg(feature = "async")]
83    pub(crate) fn with<R, F: FnOnce(&GraphMemory) -> R>(&self, f: F) -> R {
84        let g = self.inner.lock().expect("graph mutex poisoned");
85        f(&g)
86    }
87
88    pub(crate) fn sqlite_ref(&self) -> SqliteStoreRef<'_> {
89        #[cfg(not(feature = "async"))]
90        {
91            SqliteStoreRef::borrowed(self.inner.sqlite_store())
92        }
93        #[cfg(feature = "async")]
94        {
95            SqliteStoreRef::from_guard(
96                self.inner
97                    .lock()
98                    .expect("graph mutex"),
99            )
100        }
101    }
102
103    #[cfg(feature = "async")]
104    pub(crate) fn shared_arc(&self) -> Arc<Mutex<GraphMemory>> {
105        Arc::clone(&self.inner)
106    }
107}