everruns_local/backends.rs
1// LocalBackends — composable construction of runtime backends + local stores.
2//
3// Composability requirement (EVE-594): an embedder MUST be able to keep its own
4// event bus and its own `SessionFileSystemFactory`. `LocalBackends` therefore
5// takes the runtime backend bundle (which already carries the caller's event
6// bus) as input, and the file system factory is configured on the embedder's
7// `PlatformDefinition` — never forced here. The individual local stores are
8// exposed so an embedder can wire only the pieces it wants.
9
10use std::sync::Arc;
11
12use everruns_core::error::Result;
13use everruns_core::platform_store::PlatformStore;
14use everruns_core::traits::SessionScheduleStore;
15use everruns_core::typed_id::{PrincipalId, SessionId};
16use everruns_runtime::{PlatformStoreFactory, RuntimeBackends, ScheduleStoreFactory};
17
18use crate::db::SqliteDb;
19use crate::platform_store::{LocalPlatformStore, LocalSessionRunner};
20use crate::profile::LocalProfile;
21use crate::schedule_store::LocalScheduleStore;
22use crate::task_registry::LocalSessionTaskRegistry;
23
24/// The local stores plus a ready-to-use `RuntimeBackends`.
25///
26/// `runtime_backends` carries whatever store set / event bus the caller passed
27/// in, plus the local SQLite-backed task registry and schedule store factory
28/// attached here. The platform store factory is attached separately via
29/// [`LocalBackends::with_platform_runner`] once the embedder supplies a
30/// `LocalSessionRunner` (the runner usually wraps the runtime built from these
31/// backends, hence the two-step wiring).
32pub struct LocalBackends {
33 /// Composable runtime backend bundle (caller bus preserved).
34 pub runtime_backends: RuntimeBackends,
35 /// Shared SQLite handle backing the local stores.
36 pub db: SqliteDb,
37 /// SQLite-backed task registry (also installed on `runtime_backends`).
38 pub task_registry: Arc<LocalSessionTaskRegistry>,
39 /// Profile used to build the stores.
40 pub profile: LocalProfile,
41 org_id: i64,
42}
43
44impl LocalBackends {
45 /// Build local backends from a profile and caller-provided composable
46 /// pieces. The event bus and any custom stores already configured on
47 /// `runtime_backends` are preserved; this only attaches the local task
48 /// registry and schedule store factory.
49 ///
50 /// The SQLite database is opened at `profile.db_path()`; the data directory
51 /// is created if needed.
52 pub fn new(profile: LocalProfile, runtime_backends: RuntimeBackends) -> Result<Self> {
53 profile
54 .ensure_dirs()
55 .map_err(|e| everruns_core::AgentLoopError::config(e.to_string()))?;
56 let db = SqliteDb::open(profile.db_path()).map_err(everruns_core::AgentLoopError::from)?;
57 Self::with_db(profile, runtime_backends, db)
58 }
59
60 /// Build local backends over an already-open SQLite handle (e.g. an
61 /// in-memory DB for tests).
62 pub fn with_db(
63 profile: LocalProfile,
64 runtime_backends: RuntimeBackends,
65 db: SqliteDb,
66 ) -> Result<Self> {
67 let org_id = everruns_runtime::in_process_internal_org_id(&profile.org_public_id);
68 let task_registry = Arc::new(LocalSessionTaskRegistry::new(db.clone())?);
69
70 // Ensure the schedule schema exists once up front (propagating any
71 // error), so the per-(org) factory on the act path stays cheap and
72 // cannot panic. `new` here both creates the schema and validates the DB.
73 LocalScheduleStore::new(db.clone(), org_id, profile.owner_principal_id)?;
74 let schedule_db = db.clone();
75 let owner = profile.owner_principal_id;
76 let schedule_factory: ScheduleStoreFactory = Arc::new(move |org_id: i64| {
77 // One schedule store per org, over the shared DB handle. Schema is
78 // already initialized above, so this is infallible.
79 Arc::new(LocalScheduleStore::scoped(
80 schedule_db.clone(),
81 org_id,
82 owner,
83 )) as Arc<dyn SessionScheduleStore>
84 });
85
86 let runtime_backends = runtime_backends
87 .with_session_task_registry(task_registry.clone())
88 .with_schedule_store_factory(schedule_factory);
89
90 Ok(Self {
91 runtime_backends,
92 db,
93 task_registry,
94 profile,
95 org_id,
96 })
97 }
98
99 /// Build a `LocalScheduleStore` directly (for the embedder that wants the
100 /// concrete type and its additive metadata methods).
101 pub fn schedule_store(&self) -> Result<LocalScheduleStore> {
102 LocalScheduleStore::new(
103 self.db.clone(),
104 self.org_id,
105 self.profile.owner_principal_id,
106 )
107 }
108
109 /// Attach a platform store factory built from a caller-supplied
110 /// `LocalSessionRunner`. Call this after the runtime (and therefore the
111 /// runner) exists. The factory hands out a [`LocalPlatformStore`] per
112 /// (org, session); the runner is the source of truth for local session
113 /// state, so no extra store handles are required.
114 pub fn with_platform_runner(mut self, runner: Arc<dyn LocalSessionRunner>) -> Self {
115 let base_url = self.profile.base_url.clone();
116 let factory: PlatformStoreFactory =
117 Arc::new(move |_org_id: i64, _session_id: SessionId| {
118 Arc::new(LocalPlatformStore::new(runner.clone(), base_url.clone()))
119 as Arc<dyn PlatformStore>
120 });
121 self.runtime_backends = self.runtime_backends.with_platform_store_factory(factory);
122 self
123 }
124
125 /// Internal org id this backend set is scoped to.
126 pub fn org_id(&self) -> i64 {
127 self.org_id
128 }
129
130 pub fn owner_principal_id(&self) -> PrincipalId {
131 self.profile.owner_principal_id
132 }
133}