Skip to main content

everruns_local/
backends.rs

1// LocalBackends — composable construction of runtime backends + local stores.
2//
3// Composability requirement (EVE-594): an embedder MUST be able to keep its own
4// event bus and its own `SessionFileSystemFactory`. `LocalBackends` therefore
5// takes the runtime backend bundle (which already carries the caller's event
6// bus) as input, and the file system factory is configured on the embedder's
7// `PlatformDefinition` — never forced here. The individual local stores are
8// exposed so an embedder can wire only the pieces it wants.
9
10use std::sync::Arc;
11
12use everruns_core::error::Result;
13use everruns_core::platform_store::PlatformStore;
14use everruns_core::traits::SessionScheduleStore;
15use everruns_core::typed_id::{PrincipalId, SessionId};
16use everruns_runtime::{PlatformStoreFactory, RuntimeBackends, ScheduleStoreFactory};
17
18use crate::db::SqliteDb;
19use crate::platform_store::{LocalPlatformStore, LocalSessionRunner};
20use crate::profile::LocalProfile;
21use crate::schedule_store::LocalScheduleStore;
22use crate::task_registry::LocalSessionTaskRegistry;
23
24/// The local stores plus a ready-to-use `RuntimeBackends`.
25///
26/// `runtime_backends` carries whatever store set / event bus the caller passed
27/// in, plus the local SQLite-backed task registry and schedule store factory
28/// attached here. The platform store factory is attached separately via
29/// [`LocalBackends::with_platform_runner`] once the embedder supplies a
30/// `LocalSessionRunner` (the runner usually wraps the runtime built from these
31/// backends, hence the two-step wiring).
32pub struct LocalBackends {
33    /// Composable runtime backend bundle (caller bus preserved).
34    pub runtime_backends: RuntimeBackends,
35    /// Shared SQLite handle backing the local stores.
36    pub db: SqliteDb,
37    /// SQLite-backed task registry (also installed on `runtime_backends`).
38    pub task_registry: Arc<LocalSessionTaskRegistry>,
39    /// Profile used to build the stores.
40    pub profile: LocalProfile,
41    org_id: i64,
42}
43
44impl LocalBackends {
45    /// Build local backends from a profile and caller-provided composable
46    /// pieces. The event bus and any custom stores already configured on
47    /// `runtime_backends` are preserved; this only attaches the local task
48    /// registry and schedule store factory.
49    ///
50    /// The SQLite database is opened at `profile.db_path()`; the data directory
51    /// is created if needed.
52    pub fn new(profile: LocalProfile, runtime_backends: RuntimeBackends) -> Result<Self> {
53        profile
54            .ensure_dirs()
55            .map_err(|e| everruns_core::AgentLoopError::config(e.to_string()))?;
56        let db = SqliteDb::open(profile.db_path()).map_err(everruns_core::AgentLoopError::from)?;
57        Self::with_db(profile, runtime_backends, db)
58    }
59
60    /// Build local backends over an already-open SQLite handle (e.g. an
61    /// in-memory DB for tests).
62    pub fn with_db(
63        profile: LocalProfile,
64        runtime_backends: RuntimeBackends,
65        db: SqliteDb,
66    ) -> Result<Self> {
67        let org_id = everruns_runtime::in_process_internal_org_id(&profile.org_public_id);
68        let task_registry = Arc::new(LocalSessionTaskRegistry::new(db.clone())?);
69
70        // Ensure the schedule schema exists once up front (propagating any
71        // error), so the per-(org) factory on the act path stays cheap and
72        // cannot panic. `new` here both creates the schema and validates the DB.
73        LocalScheduleStore::new(db.clone(), org_id, profile.owner_principal_id)?;
74        let schedule_db = db.clone();
75        let owner = profile.owner_principal_id;
76        let schedule_factory: ScheduleStoreFactory = Arc::new(move |org_id: i64| {
77            // One schedule store per org, over the shared DB handle. Schema is
78            // already initialized above, so this is infallible.
79            Arc::new(LocalScheduleStore::scoped(
80                schedule_db.clone(),
81                org_id,
82                owner,
83            )) as Arc<dyn SessionScheduleStore>
84        });
85
86        let runtime_backends = runtime_backends
87            .with_session_task_registry(task_registry.clone())
88            .with_schedule_store_factory(schedule_factory);
89
90        Ok(Self {
91            runtime_backends,
92            db,
93            task_registry,
94            profile,
95            org_id,
96        })
97    }
98
99    /// Build a `LocalScheduleStore` directly (for the embedder that wants the
100    /// concrete type and its additive metadata methods).
101    pub fn schedule_store(&self) -> Result<LocalScheduleStore> {
102        LocalScheduleStore::new(
103            self.db.clone(),
104            self.org_id,
105            self.profile.owner_principal_id,
106        )
107    }
108
109    /// Attach a platform store factory built from a caller-supplied
110    /// `LocalSessionRunner`. Call this after the runtime (and therefore the
111    /// runner) exists. The factory hands out a [`LocalPlatformStore`] per
112    /// (org, session); the runner is the source of truth for local session
113    /// state, so no extra store handles are required.
114    pub fn with_platform_runner(mut self, runner: Arc<dyn LocalSessionRunner>) -> Self {
115        let base_url = self.profile.base_url.clone();
116        let factory: PlatformStoreFactory =
117            Arc::new(move |_org_id: i64, _session_id: SessionId| {
118                Arc::new(LocalPlatformStore::new(runner.clone(), base_url.clone()))
119                    as Arc<dyn PlatformStore>
120            });
121        self.runtime_backends = self.runtime_backends.with_platform_store_factory(factory);
122        self
123    }
124
125    /// Internal org id this backend set is scoped to.
126    pub fn org_id(&self) -> i64 {
127        self.org_id
128    }
129
130    pub fn owner_principal_id(&self) -> PrincipalId {
131        self.profile.owner_principal_id
132    }
133}