Skip to main content

rlmesh_sandbox/
lib.rs

1mod docker;
2mod error;
3mod hf;
4mod source;
5mod wheel;
6
7use std::collections::BTreeMap;
8
9use anyhow::Result;
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12
13pub use error::SandboxError;
14pub use source::{EnvironmentSourceRef, GymSourceRef, HfSourceRef};
15pub(crate) use wheel::ResolvedRlmeshPackage;
16
17pub const DEFAULT_BASE_IMAGE: &str = "python:3.11-slim";
18pub const DEFAULT_PACKAGE_NAME: &str = "rlmesh";
19pub const BOOTSTRAP_SCHEMA_VERSION: u32 = 1;
20
21#[derive(Debug, Clone)]
22pub struct SandboxOptions {
23    pub base_image: Option<String>,
24    pub rlmesh_package: Option<String>,
25    pub packages: Vec<String>,
26    pub imports: Vec<String>,
27    pub kwargs: BTreeMap<String, serde_json::Value>,
28    pub num_envs: usize,
29    pub vectorization_mode: VectorizationMode,
30    pub trust_remote_code: bool,
31    pub allow_unpinned_hf: bool,
32    /// Opt-in memory ceiling for the build. `None` builds via the default docker
33    /// builder (today's behaviour). A docker size string (e.g. `"20g"`) or the
34    /// literal `"auto"` routes the build through a bounded `docker-container`
35    /// buildx builder so an OOM is a clean cgroup-local build failure instead of
36    /// a host freeze. Host-relative, never baked, so it stays out of the build
37    /// hash.
38    pub build_memory: Option<String>,
39}
40
41impl Default for SandboxOptions {
42    fn default() -> Self {
43        Self {
44            base_image: None,
45            rlmesh_package: None,
46            packages: Vec::new(),
47            imports: Vec::new(),
48            kwargs: BTreeMap::new(),
49            num_envs: 1,
50            vectorization_mode: VectorizationMode::Sync,
51            trust_remote_code: false,
52            allow_unpinned_hf: false,
53            build_memory: None,
54        }
55    }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(rename_all = "snake_case")]
60pub enum VectorizationMode {
61    Sync,
62    Async,
63}
64
65impl VectorizationMode {
66    pub fn parse(value: Option<&str>) -> std::result::Result<Self, SandboxError> {
67        match value.unwrap_or("sync").trim() {
68            "sync" => Ok(Self::Sync),
69            "async" => Ok(Self::Async),
70            other => Err(SandboxError::invalid_option(format!(
71                "vectorization_mode must be 'sync' or 'async', got '{other}'"
72            ))),
73        }
74    }
75
76    pub fn as_str(self) -> &'static str {
77        match self {
78            Self::Sync => "sync",
79            Self::Async => "async",
80        }
81    }
82}
83
84impl SandboxOptions {
85    pub fn resolved_base_image(&self) -> String {
86        self.base_image
87            .clone()
88            .or_else(|| std::env::var("RLMESH_SANDBOX_BASE_IMAGE").ok())
89            .filter(|value| !value.trim().is_empty())
90            .unwrap_or_else(|| DEFAULT_BASE_IMAGE.to_string())
91    }
92
93    /// The requested build memory ceiling: field, else
94    /// `RLMESH_SANDBOX_BUILD_MEMORY`, else `None`. The raw value (a docker size
95    /// string or the literal `"auto"`/`"off"`) is interpreted by the docker
96    /// backend; this only applies the field-over-env precedence, mirroring
97    /// [`resolved_base_image`](Self::resolved_base_image).
98    pub fn resolved_build_memory(&self) -> Option<String> {
99        self.build_memory
100            .clone()
101            .or_else(|| std::env::var("RLMESH_SANDBOX_BUILD_MEMORY").ok())
102            .map(|value| value.trim().to_string())
103            .filter(|value| !value.is_empty())
104    }
105
106    fn resolved_rlmesh_package(&self, base_image: &str) -> Result<ResolvedRlmeshPackage> {
107        let selected = self
108            .rlmesh_package
109            .clone()
110            .or_else(|| {
111                std::env::var("RLMESH_SANDBOX_RLMESH_PACKAGE")
112                    .ok()
113                    .filter(|value| !value.trim().is_empty())
114            })
115            .unwrap_or_else(default_rlmesh_package);
116
117        wheel::resolve_rlmesh_package(validate_nonempty("rlmesh_package", selected)?, base_image)
118    }
119}
120
121/// Details of a started sandbox container, returned by [`start_env`] and
122/// [`start_env_async`].
123///
124/// Dropping this without recording `container_id` leaks a running container, so
125/// it is `#[must_use]`. It is `#[non_exhaustive]` so future fields (extra
126/// container metadata and ports can be added without breaking callers that
127/// read fields by name.
128#[derive(Debug, Clone)]
129#[must_use = "dropping a RunResult without its container_id leaks the started container"]
130#[non_exhaustive]
131pub struct RunResult {
132    pub requested_source: String,
133    pub resolved_source: String,
134    pub address: String,
135    pub container_id: String,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub(crate) struct EffectiveSandboxSpec {
140    pub schema_version: u32,
141    pub requested_source: EnvironmentSourceRef,
142    pub resolved_source: source::ResolvedEnvironmentSourceRef,
143    pub base_image: String,
144    pub rlmesh_package: ResolvedRlmeshPackage,
145    pub packages: Vec<String>,
146    pub imports: Vec<String>,
147    pub kwargs: BTreeMap<String, serde_json::Value>,
148    pub num_envs: usize,
149    pub vectorization_mode: VectorizationMode,
150    /// Opt-in build memory ceiling (see [`SandboxOptions::build_memory`]).
151    /// Excluded from `build_hash` -- host-relative, never baked into the image.
152    pub build_memory: Option<String>,
153    pub build_hash: String,
154}
155
156impl EffectiveSandboxSpec {
157    fn resolve(
158        source: EnvironmentSourceRef,
159        options: SandboxOptions,
160    ) -> std::result::Result<Self, SandboxError> {
161        let build_memory = options.resolved_build_memory();
162
163        let base_image = validate_nonempty("base_image", options.resolved_base_image())
164            .map_err(SandboxError::invalid_option)?;
165        let rlmesh_package = options
166            .resolved_rlmesh_package(&base_image)
167            .map_err(SandboxError::wheel)?;
168        let packages =
169            validate_specs("packages", options.packages).map_err(SandboxError::invalid_option)?;
170        let imports =
171            validate_specs("imports", options.imports).map_err(SandboxError::invalid_option)?;
172        validate_source_trust(
173            &source,
174            options.trust_remote_code,
175            options.allow_unpinned_hf,
176        )
177        .map_err(SandboxError::huggingface_policy)?;
178        let kwargs = options.kwargs;
179        let num_envs = validate_num_envs(options.num_envs).map_err(SandboxError::invalid_option)?;
180        let vectorization_mode = options.vectorization_mode;
181        let resolved_source = resolve_source(&source).map_err(SandboxError::source_resolution)?;
182
183        // build_hash deliberately excludes runtime-only parameters (kwargs,
184        // num_envs, vectorization_mode): they are delivered to the container at
185        // `docker run` time via the bootstrap payload, never baked into the
186        // image, so changing them must not produce a new image tag or trigger a
187        // rebuild.
188        let build_hash = build_hash(&BuildHashInput {
189            schema_version: BOOTSTRAP_SCHEMA_VERSION,
190            source: &resolved_source,
191            base_image: &base_image,
192            rlmesh_package: &rlmesh_package,
193            packages: &packages,
194            imports: &imports,
195        })
196        .map_err(SandboxError::invalid_option)?;
197
198        Ok(Self {
199            schema_version: BOOTSTRAP_SCHEMA_VERSION,
200            requested_source: source,
201            resolved_source,
202            base_image,
203            rlmesh_package,
204            packages,
205            imports,
206            kwargs,
207            num_envs,
208            vectorization_mode,
209            build_memory,
210            build_hash,
211        })
212    }
213
214    pub(crate) fn slug(&self) -> String {
215        self.resolved_source.slug()
216    }
217
218    /// The deterministic local image reference for this spec -- the single
219    /// source of truth for both the build (`ensure_image`) and the export tag.
220    pub(crate) fn image_tag(&self) -> String {
221        format!(
222            "rlmesh-sandbox-{}:{}",
223            self.slug(),
224            &self.build_hash[..12.min(self.build_hash.len())]
225        )
226    }
227
228    pub(crate) fn requested_display(&self) -> String {
229        self.requested_source.to_string()
230    }
231
232    pub(crate) fn resolved_display(&self) -> String {
233        self.resolved_source.to_string()
234    }
235}
236
237#[derive(Serialize)]
238struct BuildHashInput<'a> {
239    schema_version: u32,
240    source: &'a source::ResolvedEnvironmentSourceRef,
241    base_image: &'a str,
242    rlmesh_package: &'a ResolvedRlmeshPackage,
243    packages: &'a [String],
244    imports: &'a [String],
245}
246
247fn validate_source_trust(
248    source: &EnvironmentSourceRef,
249    trust_remote_code: bool,
250    allow_unpinned_hf: bool,
251) -> Result<()> {
252    let EnvironmentSourceRef::Hf(source) = source else {
253        return Ok(());
254    };
255
256    anyhow::ensure!(
257        trust_remote_code,
258        "hf:// sandbox sources execute remote code from env.py and requirements.txt; pass trust_remote_code=True only for sources you trust"
259    );
260
261    if allow_unpinned_hf {
262        return Ok(());
263    }
264
265    let Some(revision) = source.revision.as_deref() else {
266        anyhow::bail!(
267            "hf:// sandbox sources must pin a full 40-character git SHA by default; pass allow_unpinned_hf=True to opt into branch/tag resolution"
268        );
269    };
270    anyhow::ensure!(
271        looks_like_full_git_sha(revision),
272        "hf:// sandbox revision must be a full 40-character git SHA by default; pass allow_unpinned_hf=True to opt into branch/tag resolution"
273    );
274
275    Ok(())
276}
277
278fn resolve_source(source: &EnvironmentSourceRef) -> Result<source::ResolvedEnvironmentSourceRef> {
279    match source {
280        EnvironmentSourceRef::Gym(source) => {
281            Ok(source::ResolvedEnvironmentSourceRef::Gym(source.clone()))
282        }
283        EnvironmentSourceRef::Hf(source) => {
284            let resolved_revision = hf::resolve_revision(source)?;
285            Ok(source::ResolvedEnvironmentSourceRef::Hf(
286                source::ResolvedHfSourceRef {
287                    repo: source.repo.clone(),
288                    resolved_revision,
289                    suite: source.suite.clone(),
290                    task: source.task.clone(),
291                },
292            ))
293        }
294    }
295}
296
297/// Build the sandbox image and start a container for `source`.
298///
299/// This is a synchronous convenience wrapper around [`start_env_async`]. It
300/// must not be called from within an existing tokio runtime: it creates its
301/// own runtime internally and will panic ("Cannot start a runtime from within
302/// a runtime") if one is already active. From async code, call
303/// [`start_env_async`] directly.
304pub fn start_env(
305    source: EnvironmentSourceRef,
306    options: SandboxOptions,
307) -> std::result::Result<RunResult, SandboxError> {
308    let runtime = tokio::runtime::Runtime::new().map_err(|err| {
309        SandboxError::container_startup(format!("failed to create runtime: {err}"))
310    })?;
311    runtime.block_on(start_env_async(source, options))
312}
313
314/// Build the sandbox image and start a container for `source`.
315///
316/// This is the async-first entry point; it is safe to call from inside a tokio
317/// runtime. The synchronous [`start_env`] wrapper is provided for convenience
318/// and must not be called from an async context.
319pub async fn start_env_async(
320    source: EnvironmentSourceRef,
321    options: SandboxOptions,
322) -> std::result::Result<RunResult, SandboxError> {
323    let spec = EffectiveSandboxSpec::resolve(source, options)?;
324    let docker = docker::DockerBackend;
325    // Best-effort: sweep containers orphaned by a prior hard kill before
326    // starting a new one. Label-keyed and env-agnostic, so this also reclaims
327    // orphaned model containers. A reaper failure must never fail the start.
328    if let Err(err) = docker.reap_orphaned_containers() {
329        tracing::debug!("orphan reap before sandbox start failed: {err:#}");
330    }
331    let artifact = docker.ensure_image(&spec).map_err(|err| {
332        SandboxError::from_docker_op(err, |m| SandboxError::ImageBuild { message: m })
333    })?;
334    let started = docker
335        .run_container_async(&spec, &artifact)
336        .await
337        .map_err(|err| {
338            SandboxError::from_docker_op(err, |m| SandboxError::ContainerStartup { message: m })
339        })?;
340
341    Ok(RunResult {
342        requested_source: spec.requested_display(),
343        resolved_source: spec.resolved_display(),
344        address: started.address,
345        container_id: started.container_id,
346    })
347}
348
349/// Stop and remove a sandbox container by id.
350pub fn stop_container(container_id: &str) -> std::result::Result<(), SandboxError> {
351    docker::DockerBackend
352        .stop_container(container_id)
353        .map_err(|err| SandboxError::from_docker_op(err, |m| SandboxError::Docker { message: m }))
354}
355
356/// Best-effort reap of orphaned rlmesh-owned sandbox containers.
357///
358/// Only containers whose owner process has exited are removed, so this is safe
359/// to call while other live rlmesh processes hold running sessions. Returns the
360/// ids that were removed.
361pub fn reap_orphaned_containers() -> std::result::Result<Vec<String>, SandboxError> {
362    docker::DockerBackend
363        .reap_orphaned_containers()
364        .map_err(|err| SandboxError::from_docker_op(err, |m| SandboxError::Docker { message: m }))
365}
366
367pub fn default_rlmesh_package() -> String {
368    format!(
369        "{DEFAULT_PACKAGE_NAME}=={}",
370        python_package_version(env!("CARGO_PKG_VERSION"))
371    )
372}
373
374fn python_package_version(version: &str) -> String {
375    if let Some((base, suffix)) = version.split_once("-alpha.") {
376        return format!("{base}a{suffix}");
377    }
378    if let Some((base, suffix)) = version.split_once("-beta.") {
379        return format!("{base}b{suffix}");
380    }
381    if let Some((base, suffix)) = version.split_once("-rc.") {
382        return format!("{base}rc{suffix}");
383    }
384    version.to_string()
385}
386
387fn build_hash(input: &BuildHashInput<'_>) -> Result<String> {
388    let raw = serde_json::to_vec(input)?;
389    let mut hasher = Sha256::new();
390    hasher.update(raw);
391    Ok(hex(&hasher.finalize()))
392}
393
394/// Lowercase-hex encode a byte slice (e.g. a SHA-256 digest).
395pub(crate) fn hex(bytes: &[u8]) -> String {
396    use std::fmt::Write as _;
397    bytes.iter().fold(String::new(), |mut acc, byte| {
398        let _ = write!(acc, "{byte:02x}");
399        acc
400    })
401}
402
403/// Quote a token for safe single-argument use in a `/bin/sh` command.
404pub(crate) fn shell_quote(value: &str) -> String {
405    format!("'{}'", value.replace('\'', "'\"'\"'"))
406}
407
408fn validate_nonempty(label: &str, value: String) -> Result<String> {
409    let value = value.trim().to_string();
410    anyhow::ensure!(!value.is_empty(), "{label} must not be empty");
411    anyhow::ensure!(
412        !value.contains('\n') && !value.contains('\r'),
413        "{label} must not contain newlines"
414    );
415    Ok(value)
416}
417
418fn validate_specs(label: &str, values: Vec<String>) -> Result<Vec<String>> {
419    values
420        .into_iter()
421        .map(|value| validate_nonempty(label, value))
422        .collect()
423}
424
425fn validate_num_envs(value: usize) -> Result<usize> {
426    anyhow::ensure!(value > 0, "num_envs must be at least 1");
427    Ok(value)
428}
429
430pub(crate) fn looks_like_full_git_sha(value: &str) -> bool {
431    value.len() == 40 && value.chars().all(|ch| ch.is_ascii_hexdigit())
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    #[test]
439    fn package_version_uses_pep440_prereleases() {
440        assert_eq!(python_package_version("0.1.0-alpha.1"), "0.1.0a1");
441        assert_eq!(python_package_version("0.1.0-beta.2"), "0.1.0b2");
442        assert_eq!(python_package_version("0.1.0-rc.3"), "0.1.0rc3");
443        assert_eq!(python_package_version("0.1.0"), "0.1.0");
444    }
445
446    #[test]
447    fn gym_sources_do_not_require_remote_code_trust() {
448        let source = EnvironmentSourceRef::parse("CartPole-v1").unwrap();
449        validate_source_trust(&source, false, false).unwrap();
450    }
451
452    #[test]
453    fn hf_sources_require_explicit_remote_code_trust() {
454        let source =
455            EnvironmentSourceRef::parse("hf://org/repo@0123456789abcdef0123456789abcdef01234567")
456                .unwrap();
457        let err = validate_source_trust(&source, false, false).unwrap_err();
458        assert!(err.to_string().contains("trust_remote_code=True"));
459    }
460
461    #[test]
462    fn hf_sources_require_full_sha_unless_unpinned_is_allowed() {
463        let source = EnvironmentSourceRef::parse("hf://org/repo@main").unwrap();
464        let err = validate_source_trust(&source, true, false).unwrap_err();
465        assert!(err.to_string().contains("40-character git SHA"));
466        validate_source_trust(&source, true, true).unwrap();
467    }
468
469    #[test]
470    fn hf_sources_accept_full_sha_when_trusted() {
471        let source =
472            EnvironmentSourceRef::parse("hf://org/repo@0123456789abcdef0123456789abcdef01234567")
473                .unwrap();
474        validate_source_trust(&source, true, false).unwrap();
475    }
476
477    #[test]
478    fn build_hash_changes_when_inputs_change() {
479        let source = EnvironmentSourceRef::parse("CartPole-v1").unwrap();
480        let base =
481            EffectiveSandboxSpec::resolve(source.clone(), SandboxOptions::default()).unwrap();
482        let changed = EffectiveSandboxSpec::resolve(
483            source,
484            SandboxOptions {
485                base_image: Some("python:3.12-slim".to_string()),
486                ..SandboxOptions::default()
487            },
488        )
489        .unwrap();
490
491        assert_ne!(base.build_hash, changed.build_hash);
492    }
493
494    #[test]
495    fn public_errors_are_typed_and_discriminable() {
496        // num_envs == 0 must surface as a typed InvalidOption, not a stringly error.
497        let err = EffectiveSandboxSpec::resolve(
498            EnvironmentSourceRef::parse("CartPole-v1").unwrap(),
499            SandboxOptions {
500                num_envs: 0,
501                ..SandboxOptions::default()
502            },
503        )
504        .unwrap_err();
505        assert!(matches!(err, SandboxError::InvalidOption { .. }));
506
507        // Unpinned hf source surfaces as a HuggingFacePolicy error.
508        let err = EffectiveSandboxSpec::resolve(
509            EnvironmentSourceRef::parse("hf://org/repo@main").unwrap(),
510            SandboxOptions {
511                trust_remote_code: true,
512                ..SandboxOptions::default()
513            },
514        )
515        .unwrap_err();
516        assert!(matches!(err, SandboxError::HuggingFacePolicy { .. }));
517
518        // vectorization_mode parse failures are typed too.
519        let err = VectorizationMode::parse(Some("parallel")).unwrap_err();
520        assert!(matches!(err, SandboxError::InvalidOption { .. }));
521
522        // Source parse failures are typed.
523        let err = EnvironmentSourceRef::parse("ftp://nope").unwrap_err();
524        assert!(matches!(err, SandboxError::InvalidSource { .. }));
525    }
526
527    #[test]
528    fn build_hash_is_stable_across_runtime_only_params() {
529        // kwargs, num_envs, and vectorization_mode are delivered at run time
530        // and must not change the image tag, otherwise every gym.make kwarg
531        // tweak rebuilds the image and re-downloads the pip layers.
532        let source = EnvironmentSourceRef::parse("CartPole-v1").unwrap();
533        let base =
534            EffectiveSandboxSpec::resolve(source.clone(), SandboxOptions::default()).unwrap();
535
536        let mut kwargs = BTreeMap::new();
537        kwargs.insert("render_mode".to_string(), serde_json::json!("rgb_array"));
538        let with_runtime_params = EffectiveSandboxSpec::resolve(
539            source,
540            SandboxOptions {
541                kwargs,
542                num_envs: 8,
543                vectorization_mode: VectorizationMode::Async,
544                ..SandboxOptions::default()
545            },
546        )
547        .unwrap();
548
549        assert_eq!(base.build_hash, with_runtime_params.build_hash);
550    }
551
552    #[test]
553    fn hf_task_changes_resolved_display_slug_and_build_hash() {
554        let revision = "0123456789abcdef0123456789abcdef01234567";
555        let options = SandboxOptions {
556            trust_remote_code: true,
557            ..SandboxOptions::default()
558        };
559        let first = EffectiveSandboxSpec::resolve(
560            EnvironmentSourceRef::parse(&format!("hf://org/repo@{revision}:suite/0")).unwrap(),
561            options.clone(),
562        )
563        .unwrap();
564        let second = EffectiveSandboxSpec::resolve(
565            EnvironmentSourceRef::parse(&format!("hf://org/repo@{revision}:suite/1")).unwrap(),
566            options,
567        )
568        .unwrap();
569
570        assert_eq!(
571            first.resolved_display(),
572            format!("hf://org/repo@{revision}:suite/0")
573        );
574        assert_eq!(first.slug(), "org-repo-suite-0");
575        assert_ne!(first.build_hash, second.build_hash);
576    }
577}