Skip to main content

inference_remote_core/
session.rs

1//! `RemoteSessionActor` — analog of the local `ContextActor` (CUDA §5.11).
2//!
3//! Owns the credential and HTTP-client lifecycle for a remote
4//! deployment. Restartable by its parent `RemoteEngineCoreActor` when:
5//!
6//! - sustained 401s suggest the API key has rotated,
7//! - configuration change (endpoint URL, timeouts) requires a fresh
8//!   client,
9//! - operator triggers `cluster.deployment(...).rebuild_session()`.
10//!
11//! In-flight requests held by `RemoteWorkerActor` complete with the
12//! pre-rebuild client; new requests pick up the rebuilt one.
13
14use std::sync::Arc;
15
16use arc_swap::ArcSwap;
17use async_trait::async_trait;
18use rakka_core::actor::{Actor, Context};
19use tokio::sync::oneshot;
20
21use inference_core::deployment::Timeouts;
22use inference_core::error::{InferenceError, InferenceResult};
23use inference_core::runner::SessionRebuildCause;
24use inference_core::SecretString;
25
26use crate::http::{build_client, HttpClient};
27
28/// Configuration the session needs to (re)build its client.
29#[derive(Clone)]
30pub struct SessionConfig {
31    pub user_agent: String,
32    pub timeouts: Timeouts,
33    /// Bearer / api-key credential. Cloned on rebuild so rotation
34    /// requires the secret source to have changed.
35    pub credential: Arc<dyn CredentialProvider>,
36}
37
38#[async_trait]
39pub trait CredentialProvider: Send + Sync {
40    async fn token(&self) -> InferenceResult<SecretString>;
41}
42
43/// Static API key — most common case.
44pub struct StaticApiKey(pub SecretString);
45
46#[async_trait]
47impl CredentialProvider for StaticApiKey {
48    async fn token(&self) -> InferenceResult<SecretString> {
49        // SecretString isn't Clone; we re-create from the underlying
50        // `&str` exposure — secrecy zeroizes on drop, which is fine.
51        use inference_core::ExposeSecret;
52        Ok(SecretString::from(self.0.expose_secret().to_string()))
53    }
54}
55
56/// Snapshot held by every `RemoteWorkerActor`. Shared via `ArcSwap` so
57/// rebuilds are lock-free for readers.
58pub struct SessionSnapshot {
59    pub client: HttpClient,
60    pub credential: SecretString,
61}
62
63pub struct SessionRebuildRequest {
64    pub cause: SessionRebuildCause,
65    pub reply: oneshot::Sender<InferenceResult<()>>,
66}
67
68pub struct RemoteSessionActor {
69    config: SessionConfig,
70    snapshot: Arc<ArcSwap<SessionSnapshot>>,
71}
72
73impl RemoteSessionActor {
74    /// Build the initial snapshot. Call before spawning the actor so
75    /// callers can wire `snapshot()` into worker constructors.
76    pub async fn bootstrap(config: SessionConfig) -> InferenceResult<Self> {
77        let snapshot = Self::build_snapshot(&config).await?;
78        Ok(Self {
79            config,
80            snapshot: Arc::new(ArcSwap::from_pointee(snapshot)),
81        })
82    }
83
84    pub fn snapshot(&self) -> Arc<ArcSwap<SessionSnapshot>> {
85        self.snapshot.clone()
86    }
87
88    async fn build_snapshot(config: &SessionConfig) -> InferenceResult<SessionSnapshot> {
89        let client = build_client(&config.timeouts, &config.user_agent)
90            .map_err(|e| InferenceError::Internal(format!("build http client: {e}")))?;
91        let credential = config.credential.token().await?;
92        Ok(SessionSnapshot { client, credential })
93    }
94
95    async fn rebuild(&mut self, cause: SessionRebuildCause) -> InferenceResult<()> {
96        tracing::info!(?cause, "rebuilding remote session");
97        let snap = Self::build_snapshot(&self.config).await?;
98        self.snapshot.store(Arc::new(snap));
99        Ok(())
100    }
101}
102
103#[async_trait]
104impl Actor for RemoteSessionActor {
105    type Msg = SessionRebuildRequest;
106
107    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Self::Msg) {
108        let res = self.rebuild(msg.cause).await;
109        let _ = msg.reply.send(res);
110    }
111}