use async_trait::async_trait;
use chrono::{Datelike, Timelike};
use monty::{
ExcType, ExtFunctionResult, LimitedTracker, MontyDate, MontyDateTime, MontyException,
MontyObject, MontyRun, NameLookupResult, OsFunction, PrintWriter, ResourceLimits, RunProgress,
dir_stat, file_stat, symlink_stat,
};
use std::collections::HashMap;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use super::{Builtin, Context, resolve_path};
use crate::error::Result;
use crate::fs::{FileSystem, FileType};
use crate::interpreter::ExecResult;
const DEFAULT_MAX_ALLOCATIONS: usize = 1_000_000;
const DEFAULT_MAX_DURATION: Duration = Duration::from_secs(30);
const DEFAULT_MAX_MEMORY: usize = 64 * 1024 * 1024; const DEFAULT_MAX_RECURSION: usize = 200;
const DISABLED_STDLIB_MODULES: &[&str] = &["re"];
const VIRTUAL_NOW_UNIX_SECS: i64 = 1_704_067_200; const VIRTUAL_NOW_NANOS: u32 = 123_456_000;
const PYTHON_INPROCESS_OPT_IN_ENV: &str = "BASHKIT_ALLOW_INPROCESS_PYTHON";
#[derive(Debug, Clone, Copy)]
pub(crate) struct PythonInprocessOptIn(pub bool);
fn python_inprocess_enabled(ctx: &Context<'_>) -> bool {
ctx.execution_extension::<PythonInprocessOptIn>()
.is_some_and(|opt_in| opt_in.0)
|| {
#[cfg(test)]
{
let is_enabled = |v: &str| matches!(v, "1" | "true" | "TRUE" | "yes" | "YES");
ctx.env
.get(PYTHON_INPROCESS_OPT_IN_ENV)
.is_some_and(|v| is_enabled(v))
}
#[cfg(not(test))]
{
false
}
}
}
#[derive(Debug, Clone)]
pub struct PythonLimits {
pub max_allocations: usize,
pub max_duration: Duration,
pub max_memory: usize,
pub max_recursion: usize,
}
impl Default for PythonLimits {
fn default() -> Self {
Self {
max_allocations: DEFAULT_MAX_ALLOCATIONS,
max_duration: DEFAULT_MAX_DURATION,
max_memory: DEFAULT_MAX_MEMORY,
max_recursion: DEFAULT_MAX_RECURSION,
}
}
}
impl PythonLimits {
#[must_use]
pub fn max_allocations(mut self, n: usize) -> Self {
self.max_allocations = n;
self
}
#[must_use]
pub fn max_duration(mut self, d: Duration) -> Self {
self.max_duration = d;
self
}
#[must_use]
pub fn max_memory(mut self, bytes: usize) -> Self {
self.max_memory = bytes;
self
}
#[must_use]
pub fn max_recursion(mut self, depth: usize) -> Self {
self.max_recursion = depth;
self
}
}
pub type PythonExternalFnHandler = Arc<
dyn Fn(
String,
Vec<MontyObject>,
Vec<(MontyObject, MontyObject)>,
) -> Pin<Box<dyn Future<Output = ExtFunctionResult> + Send>>
+ Send
+ Sync,
>;
#[derive(Clone)]
pub struct PythonExternalFns {
names: Vec<String>,
handler: PythonExternalFnHandler,
}
impl std::fmt::Debug for PythonExternalFns {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PythonExternalFns")
.field("names", &self.names)
.field("handler", &"<fn>")
.finish()
}
}
pub struct Python {
pub limits: PythonLimits,
external_fns: Option<PythonExternalFns>,
}
impl Python {
pub fn new() -> Self {
Self {
limits: PythonLimits::default(),
external_fns: None,
}
}
pub fn with_limits(limits: PythonLimits) -> Self {
Self {
limits,
external_fns: None,
}
}
pub fn with_external_handler(
mut self,
names: Vec<String>,
handler: PythonExternalFnHandler,
) -> Self {
self.external_fns = Some(PythonExternalFns { names, handler });
self
}
}
impl Default for Python {
fn default() -> Self {
Self::new()
}
}
fn is_disabled_import(code: &str) -> Option<&'static str> {
let is_disabled_module = |module: &str| {
DISABLED_STDLIB_MODULES.iter().any(|disabled| {
module == *disabled
|| (module.starts_with(disabled)
&& module
.as_bytes()
.get(disabled.len())
.is_some_and(|next| *next == b'.'))
})
};
for raw_line in code.lines() {
let line = raw_line.split('#').next().unwrap_or("").trim();
if line.is_empty() {
continue;
}
if let Some(imports) = line.strip_prefix("import ") {
for import in imports.split(',') {
let module = import
.split_whitespace()
.next()
.unwrap_or("")
.trim()
.trim_matches(|c: char| c == '\'' || c == '"')
.trim_end_matches(|c: char| {
!(c.is_ascii_alphanumeric() || c == '_' || c == '.')
})
.to_string();
if is_disabled_module(&module) {
return Some("re");
}
}
}
if let Some(from_stmt) = line.strip_prefix("from ") {
let module = from_stmt
.split_whitespace()
.next()
.unwrap_or("")
.trim()
.trim_matches(|c: char| c == '\'' || c == '"')
.trim_end_matches(|c: char| !(c.is_ascii_alphanumeric() || c == '_' || c == '.'))
.to_string();
if is_disabled_module(&module) {
return Some("re");
}
}
if line.contains("__import__('re'") || line.contains("__import__(\"re\"") {
return Some("re");
}
if line.contains("importlib.import_module('re'")
|| line.contains("importlib.import_module(\"re\"")
{
return Some("re");
}
}
None
}
#[async_trait]
impl Builtin for Python {
fn llm_hint(&self) -> Option<&'static str> {
Some(
"python/python3: Embedded Python (Monty). \
Stdlib: math, pathlib, os.getenv, sys, typing. \
File I/O via pathlib.Path only (no open()). \
No HTTP/network. No classes. No third-party imports.",
)
}
async fn execute(&self, ctx: Context<'_>) -> Result<ExecResult> {
let args = ctx.args;
if args.first().map(|s| s.as_str()) == Some("--version")
|| args.first().map(|s| s.as_str()) == Some("-V")
{
return Ok(ExecResult::ok("Python 3.12.0 (monty)\n".to_string()));
}
if args.first().map(|s| s.as_str()) == Some("--help")
|| args.first().map(|s| s.as_str()) == Some("-h")
{
return Ok(ExecResult::ok(
"usage: python3 [-c cmd | file | -] [arg ...]\n\
Options:\n \
-c cmd : execute code from string\n \
file : execute code from file (VFS)\n \
- : read code from stdin\n \
-V : print version\n"
.to_string(),
));
}
if !python_inprocess_enabled(&ctx) {
return Ok(ExecResult::err(
format!(
"python3: in-process Python disabled by default for security; set {}=1 to enable\n",
PYTHON_INPROCESS_OPT_IN_ENV
),
1,
));
}
let (code, filename) = if let Some(first) = args.first() {
match first.as_str() {
"-c" => {
let code = args.get(1).map(|s| s.as_str()).unwrap_or("");
if code.is_empty() {
return Ok(ExecResult::err(
"python3: option -c requires argument\n".to_string(),
2,
));
}
(code.to_string(), "<string>".to_string())
}
"-" => {
match ctx.stdin {
Some(input) if !input.is_empty() => {
(input.to_string(), "<stdin>".to_string())
}
_ => {
return Ok(ExecResult::err(
"python3: no input from stdin\n".to_string(),
1,
));
}
}
}
arg if arg.starts_with('-') => {
return Ok(ExecResult::err(
format!("python3: unknown option: {arg}\n"),
2,
));
}
script_path => {
let path = resolve_path(ctx.cwd, script_path);
match ctx.fs.read_file(&path).await {
Ok(bytes) => match String::from_utf8(bytes) {
Ok(code) => (code, script_path.to_string()),
Err(_) => {
return Ok(ExecResult::err(
format!(
"python3: can't decode file '{script_path}': not UTF-8\n"
),
1,
));
}
},
Err(_) => {
return Ok(ExecResult::err(
format!(
"python3: can't open file '{script_path}': No such file or directory\n"
),
2,
));
}
}
}
}
} else if let Some(input) = ctx.stdin {
if input.is_empty() {
return Ok(ExecResult::ok(String::new()));
}
(input.to_string(), "<stdin>".to_string())
} else {
return Ok(ExecResult::err(
"python3: interactive mode not supported in virtual mode\n".to_string(),
1,
));
};
if let Some(module) = is_disabled_import(&code) {
return Ok(ExecResult::err(
format!(
"python3: importing module '{module}' is disabled for security reasons (regex DoS risk)\n"
),
1,
));
}
let mut merged_env = ctx.env.clone();
merged_env.extend(
ctx.variables
.iter()
.filter(|(k, _)| !crate::interpreter::is_hidden_variable(k))
.map(|(k, v)| (k.clone(), v.clone())),
);
run_python(
&code,
&filename,
ctx.fs.clone(),
ctx.cwd,
&merged_env,
&self.limits,
self.external_fns.as_ref(),
)
.await
}
}
async fn run_python(
code: &str,
filename: &str,
fs: Arc<dyn FileSystem>,
cwd: &Path,
env: &HashMap<String, String>,
py_limits: &PythonLimits,
external_fns: Option<&PythonExternalFns>,
) -> Result<ExecResult> {
let code = if code.starts_with("#!") {
match code.find('\n') {
Some(pos) => &code[pos + 1..],
None => "",
}
} else {
code
};
let runner = match MontyRun::new(code.to_owned(), filename, vec![]) {
Ok(r) => r,
Err(e) => return Ok(format_exception(e)),
};
let limits = ResourceLimits::new()
.max_allocations(py_limits.max_allocations)
.max_duration(py_limits.max_duration)
.max_memory(py_limits.max_memory)
.max_recursion_depth(Some(py_limits.max_recursion));
let tracker = LimitedTracker::new(limits);
let (mut progress, mut buf) = {
let mut buf = String::new();
match runner.start(vec![], tracker, PrintWriter::CollectString(&mut buf)) {
Ok(p) => (p, buf),
Err(e) => {
return Ok(format_exception_with_output(e, &buf));
}
}
};
loop {
match progress {
RunProgress::OsCall(os_call) => {
let result = handle_os_call(
os_call.function,
&os_call.args,
&os_call.kwargs,
&fs,
cwd,
env,
)
.await;
match os_call.resume(result, PrintWriter::CollectString(&mut buf)) {
Ok(next) => {
progress = next;
}
Err(e) => {
return Ok(format_exception_with_output(e, &buf));
}
}
}
RunProgress::FunctionCall(call) => {
let result = if let Some(ef) = external_fns {
(ef.handler)(
call.function_name.clone(),
call.args.clone(),
call.kwargs.clone(),
)
.await
} else {
ExtFunctionResult::Error(MontyException::new(
ExcType::RuntimeError,
Some(
"no external function handler configured (external functions not enabled)".into(),
),
))
};
match call.resume(result, PrintWriter::CollectString(&mut buf)) {
Ok(next) => {
progress = next;
}
Err(e) => {
return Ok(format_exception_with_output(e, &buf));
}
}
}
RunProgress::NameLookup(lookup) => {
let result = if external_fns
.map(|ef| ef.names.contains(&lookup.name))
.unwrap_or(false)
{
NameLookupResult::Value(MontyObject::Function {
name: lookup.name.clone(),
docstring: None,
})
} else {
NameLookupResult::Undefined
};
match lookup.resume(result, PrintWriter::CollectString(&mut buf)) {
Ok(next) => {
progress = next;
}
Err(e) => {
return Ok(format_exception_with_output(e, &buf));
}
}
}
RunProgress::ResolveFutures(_) => {
let err = MontyException::new(
ExcType::RuntimeError,
Some("async operations not supported in virtual mode".into()),
);
return Ok(format_exception_with_output(err, &buf));
}
RunProgress::Complete(result) => {
if !matches!(result, MontyObject::None) && buf.is_empty() {
buf = format!("{}\n", result.py_repr());
}
return Ok(ExecResult::ok(buf));
}
}
}
}
async fn handle_os_call(
function: OsFunction,
args: &[MontyObject],
kwargs: &[(MontyObject, MontyObject)],
fs: &Arc<dyn FileSystem>,
cwd: &Path,
env: &HashMap<String, String>,
) -> ExtFunctionResult {
match function {
OsFunction::Getenv => return handle_getenv(args, env),
OsFunction::GetEnviron => return handle_get_environ(env),
OsFunction::DateToday => return handle_date_today(),
OsFunction::DateTimeNow => return handle_datetime_now(args),
_ => {}
}
let path = match extract_path(args, cwd) {
Some(p) => p,
None => {
return ExtFunctionResult::Error(MontyException::new(
ExcType::TypeError,
Some("expected path argument".into()),
));
}
};
match function {
OsFunction::Exists => {
let exists = fs.exists(&path).await.unwrap_or(false);
ExtFunctionResult::Return(MontyObject::Bool(exists))
}
OsFunction::IsFile => match fs.stat(&path).await {
Ok(meta) => ExtFunctionResult::Return(MontyObject::Bool(meta.file_type.is_file())),
Err(_) => ExtFunctionResult::Return(MontyObject::Bool(false)),
},
OsFunction::IsDir => match fs.stat(&path).await {
Ok(meta) => ExtFunctionResult::Return(MontyObject::Bool(meta.file_type.is_dir())),
Err(_) => ExtFunctionResult::Return(MontyObject::Bool(false)),
},
OsFunction::IsSymlink => match fs.stat(&path).await {
Ok(meta) => ExtFunctionResult::Return(MontyObject::Bool(meta.file_type.is_symlink())),
Err(_) => ExtFunctionResult::Return(MontyObject::Bool(false)),
},
OsFunction::ReadText => match fs.read_file(&path).await {
Ok(bytes) => match String::from_utf8(bytes) {
Ok(s) => ExtFunctionResult::Return(MontyObject::String(s)),
Err(_) => ExtFunctionResult::Error(MontyException::new(
ExcType::OSError,
Some(format!(
"can't decode '{}': not valid UTF-8",
path.display()
)),
)),
},
Err(e) => map_vfs_error(e, &path),
},
OsFunction::ReadBytes => match fs.read_file(&path).await {
Ok(bytes) => ExtFunctionResult::Return(MontyObject::Bytes(bytes)),
Err(e) => map_vfs_error(e, &path),
},
OsFunction::WriteText => {
let content = match args.get(1) {
Some(MontyObject::String(s)) => s.as_bytes().to_vec(),
_ => {
return ExtFunctionResult::Error(MontyException::new(
ExcType::TypeError,
Some("write_text() requires a string argument".into()),
));
}
};
let len = content.len();
match fs.write_file(&path, &content).await {
Ok(()) => ExtFunctionResult::Return(MontyObject::Int(len as i64)),
Err(e) => map_vfs_error(e, &path),
}
}
OsFunction::WriteBytes => {
let content = match args.get(1) {
Some(MontyObject::Bytes(b)) => b.clone(),
_ => {
return ExtFunctionResult::Error(MontyException::new(
ExcType::TypeError,
Some("write_bytes() requires a bytes argument".into()),
));
}
};
let len = content.len();
match fs.write_file(&path, &content).await {
Ok(()) => ExtFunctionResult::Return(MontyObject::Int(len as i64)),
Err(e) => map_vfs_error(e, &path),
}
}
OsFunction::Mkdir => {
let parents = get_bool_kwarg(kwargs, "parents").unwrap_or(false);
let exist_ok = get_bool_kwarg(kwargs, "exist_ok").unwrap_or(false);
match fs.mkdir(&path, parents).await {
Ok(()) => ExtFunctionResult::Return(MontyObject::None),
Err(e) => {
let msg = e.to_string();
if exist_ok && msg.contains("already exists") {
ExtFunctionResult::Return(MontyObject::None)
} else {
map_vfs_error(e, &path)
}
}
}
}
OsFunction::Unlink => match fs.remove(&path, false).await {
Ok(()) => ExtFunctionResult::Return(MontyObject::None),
Err(e) => map_vfs_error(e, &path),
},
OsFunction::Rmdir => match fs.remove(&path, false).await {
Ok(()) => ExtFunctionResult::Return(MontyObject::None),
Err(e) => map_vfs_error(e, &path),
},
OsFunction::Iterdir => match fs.read_dir(&path).await {
Ok(entries) => {
let items: Vec<MontyObject> = entries
.into_iter()
.map(|e| {
let child = path.join(&e.name);
MontyObject::Path(child.to_string_lossy().to_string())
})
.collect();
ExtFunctionResult::Return(MontyObject::List(items))
}
Err(e) => map_vfs_error(e, &path),
},
OsFunction::Stat => match fs.stat(&path).await {
Ok(meta) => {
let mtime = meta
.modified
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0);
let stat_obj = match meta.file_type {
FileType::Directory => dir_stat(meta.mode as i64, mtime),
FileType::Symlink => symlink_stat(meta.mode as i64, mtime),
_ => file_stat(meta.mode as i64, meta.size as i64, mtime),
};
ExtFunctionResult::Return(stat_obj)
}
Err(e) => map_vfs_error(e, &path),
},
OsFunction::Rename => {
let target = match args.get(1) {
Some(MontyObject::Path(p)) | Some(MontyObject::String(p)) => {
resolve_python_path(p, cwd)
}
_ => {
return ExtFunctionResult::Error(MontyException::new(
ExcType::TypeError,
Some("rename() requires a target path".into()),
));
}
};
match fs.rename(&path, &target).await {
Ok(()) => ExtFunctionResult::Return(MontyObject::Path(
target.to_string_lossy().to_string(),
)),
Err(e) => map_vfs_error(e, &path),
}
}
OsFunction::Resolve | OsFunction::Absolute => {
ExtFunctionResult::Return(MontyObject::Path(path.to_string_lossy().to_string()))
}
_ => ExtFunctionResult::Error(MontyException::new(
ExcType::OSError,
Some(format!("{function} not supported in virtual mode")),
)),
}
}
fn extract_path(args: &[MontyObject], cwd: &Path) -> Option<PathBuf> {
match args.first()? {
MontyObject::Path(s) | MontyObject::String(s) => Some(resolve_python_path(s, cwd)),
_ => None,
}
}
fn resolve_python_path(path_str: &str, cwd: &Path) -> PathBuf {
let p = Path::new(path_str);
if p.is_absolute() {
p.to_owned()
} else {
cwd.join(p)
}
}
fn map_vfs_error(e: crate::Error, path: &Path) -> ExtFunctionResult {
let msg = e.to_string();
let path_str = path.display().to_string();
let (exc_type, errno) = if msg.contains("not found") || msg.contains("No such file") {
(ExcType::FileNotFoundError, 2)
} else if msg.contains("is a directory") {
(ExcType::IsADirectoryError, 21)
} else if msg.contains("not a directory") {
(ExcType::NotADirectoryError, 20)
} else if msg.contains("already exists") {
(ExcType::FileExistsError, 17)
} else {
(ExcType::OSError, 0)
};
ExtFunctionResult::Error(MontyException::new(
exc_type,
Some(format!("[Errno {errno}] {msg}: '{path_str}'")),
))
}
fn get_bool_kwarg(kwargs: &[(MontyObject, MontyObject)], name: &str) -> Option<bool> {
for (key, val) in kwargs {
if let MontyObject::String(k) = key
&& k == name
{
return match val {
MontyObject::Bool(b) => Some(*b),
_ => None,
};
}
}
None
}
fn handle_getenv(args: &[MontyObject], env: &HashMap<String, String>) -> ExtFunctionResult {
let key = match args.first() {
Some(MontyObject::String(s)) => s.as_str(),
_ => {
return ExtFunctionResult::Error(MontyException::new(
ExcType::TypeError,
Some("getenv() requires a string argument".into()),
));
}
};
let default = match args.get(1) {
Some(MontyObject::None) | None => MontyObject::None,
Some(other) => other.clone(),
};
match env.get(key) {
Some(val) => ExtFunctionResult::Return(MontyObject::String(val.clone())),
None => ExtFunctionResult::Return(default),
}
}
fn handle_get_environ(env: &HashMap<String, String>) -> ExtFunctionResult {
let pairs: Vec<(MontyObject, MontyObject)> = env
.iter()
.map(|(k, v)| {
(
MontyObject::String(k.clone()),
MontyObject::String(v.clone()),
)
})
.collect();
ExtFunctionResult::Return(MontyObject::dict(pairs))
}
fn handle_date_today() -> ExtFunctionResult {
let now = virtual_now_utc();
ExtFunctionResult::Return(MontyObject::Date(MontyDate {
year: now.year(),
month: now.month() as u8,
day: now.day() as u8,
}))
}
fn handle_datetime_now(args: &[MontyObject]) -> ExtFunctionResult {
let base_utc = virtual_now_utc();
let tz = match args.first() {
Some(MontyObject::TimeZone(tz)) => Some(tz),
_ => None,
};
if let Some(tz) = tz {
let offset = chrono::FixedOffset::east_opt(tz.offset_seconds)
.unwrap_or(chrono::FixedOffset::east_opt(0).expect("UTC offset is always valid"));
let dt = base_utc.with_timezone(&offset);
ExtFunctionResult::Return(MontyObject::DateTime(MontyDateTime {
year: dt.year(),
month: dt.month() as u8,
day: dt.day() as u8,
hour: dt.hour() as u8,
minute: dt.minute() as u8,
second: dt.second() as u8,
microsecond: dt.nanosecond() / 1000,
offset_seconds: Some(tz.offset_seconds),
timezone_name: tz.name.clone(),
}))
} else {
let dt = base_utc.naive_utc();
ExtFunctionResult::Return(MontyObject::DateTime(MontyDateTime {
year: dt.year(),
month: dt.month() as u8,
day: dt.day() as u8,
hour: dt.hour() as u8,
minute: dt.minute() as u8,
second: dt.second() as u8,
microsecond: dt.nanosecond() / 1000,
offset_seconds: None,
timezone_name: None,
}))
}
}
fn virtual_now_utc() -> chrono::DateTime<chrono::Utc> {
chrono::DateTime::<chrono::Utc>::from_timestamp(VIRTUAL_NOW_UNIX_SECS, VIRTUAL_NOW_NANOS)
.expect("virtual timestamp constant must be valid")
}
fn format_exception(e: MontyException) -> ExecResult {
ExecResult::err(format!("{e}\n"), 1)
}
fn format_exception_with_output(e: MontyException, printed: &str) -> ExecResult {
let stderr = format!("{e}\n");
let mut result = ExecResult::err(stderr, 1);
if !printed.is_empty() {
result.stdout = printed.to_string();
}
result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::builtins::Context;
use crate::fs::InMemoryFs;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
fn opt_in_env() -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert(PYTHON_INPROCESS_OPT_IN_ENV.to_string(), "1".to_string());
env
}
async fn run(args: &[&str], stdin: Option<&str>) -> ExecResult {
let args: Vec<String> = args.iter().map(|s| s.to_string()).collect();
let env = opt_in_env();
let mut variables = HashMap::new();
let mut cwd = PathBuf::from("/home/user");
let fs = Arc::new(InMemoryFs::new());
let ctx = Context::new_for_test(&args, &env, &mut variables, &mut cwd, fs, stdin);
Python::new().execute(ctx).await.unwrap()
}
async fn run_with_file(args: &[&str], file_path: &str, content: &str) -> ExecResult {
let args: Vec<String> = args.iter().map(|s| s.to_string()).collect();
let env = opt_in_env();
let mut variables = HashMap::new();
let mut cwd = PathBuf::from("/home/user");
let fs = Arc::new(InMemoryFs::new());
fs.write_file(std::path::Path::new(file_path), content.as_bytes())
.await
.unwrap();
let ctx = Context::new_for_test(&args, &env, &mut variables, &mut cwd, fs, None);
Python::new().execute(ctx).await.unwrap()
}
async fn run_with_vfs(
args: &[&str],
files: &[(&str, &str)],
env_vars: &[(&str, &str)],
) -> ExecResult {
let args: Vec<String> = args.iter().map(|s| s.to_string()).collect();
let mut env: HashMap<String, String> = env_vars
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
env.insert(PYTHON_INPROCESS_OPT_IN_ENV.to_string(), "1".to_string());
let mut variables = HashMap::new();
let mut cwd = PathBuf::from("/home/user");
let fs = Arc::new(InMemoryFs::new());
for (path, content) in files {
let p = std::path::Path::new(path);
if let Some(parent) = p.parent() {
let _ = fs.mkdir(parent, true).await;
}
fs.write_file(p, content.as_bytes()).await.unwrap();
}
let ctx = Context::new_for_test(&args, &env, &mut variables, &mut cwd, fs, None);
Python::new().execute(ctx).await.unwrap()
}
#[tokio::test]
async fn test_version() {
let r = run(&["--version"], None).await;
assert_eq!(r.exit_code, 0);
assert!(r.stdout.contains("Python 3.12.0"));
}
#[tokio::test]
async fn test_inline_print() {
let r = run(&["-c", "print('hello world')"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "hello world\n");
}
#[tokio::test]
async fn test_inline_expression() {
let r = run(&["-c", "2 + 3"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "5\n");
}
#[tokio::test]
async fn test_inline_multiline() {
let r = run(&["-c", "x = 10\ny = 20\nprint(x + y)"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "30\n");
}
#[tokio::test]
async fn test_syntax_error() {
let r = run(&["-c", "def"], None).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("SyntaxError") || r.stderr.contains("Error"));
}
#[tokio::test]
async fn test_runtime_error() {
let r = run(&["-c", "1/0"], None).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("ZeroDivisionError"));
}
#[tokio::test]
async fn test_stdin_code() {
let r = run(&["-"], Some("print('from stdin')")).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "from stdin\n");
}
#[tokio::test]
async fn test_piped_stdin() {
let r = run(&[], Some("print('piped')")).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "piped\n");
}
#[tokio::test]
async fn test_file_execution() {
let r = run_with_file(&["script.py"], "/home/user/script.py", "print('from file')").await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "from file\n");
}
#[tokio::test]
async fn test_file_not_found() {
let r = run(&["missing.py"], None).await;
assert_eq!(r.exit_code, 2);
assert!(r.stderr.contains("can't open file"));
}
#[tokio::test]
async fn test_no_args_no_stdin() {
let r = run(&[], None).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("interactive mode not supported"));
}
#[tokio::test]
async fn test_c_flag_missing_arg() {
let r = run(&["-c"], None).await;
assert_eq!(r.exit_code, 2);
assert!(r.stderr.contains("requires argument"));
}
#[tokio::test]
async fn test_unknown_option() {
let r = run(&["-x"], None).await;
assert_eq!(r.exit_code, 2);
assert!(r.stderr.contains("unknown option"));
}
#[tokio::test]
async fn test_help() {
let r = run(&["--help"], None).await;
assert_eq!(r.exit_code, 0);
assert!(r.stdout.contains("usage:"));
}
#[tokio::test]
async fn test_dict_access() {
let r = run(&["-c", "d = dict()\nd['a'] = 1\nprint(d['a'])"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "1\n");
}
#[tokio::test]
async fn test_list_comprehension() {
let r = run(&["-c", "[x*2 for x in range(3)]"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "[0, 2, 4]\n");
}
#[tokio::test]
async fn test_fstring() {
let r = run(&["-c", "x = 42\nprint(f'value={x}')"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "value=42\n");
}
#[tokio::test]
async fn test_recursion_limit() {
let r = run(&["-c", "def r(): r()\nr()"], None).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("RecursionError") || r.stderr.contains("recursion"));
}
#[tokio::test]
async fn test_shebang_stripped() {
let r = run_with_file(
&["script.py"],
"/home/user/script.py",
"#!/usr/bin/env python3\nprint('shebang ok')",
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "shebang ok\n");
}
#[tokio::test]
async fn test_name_error() {
let r = run(&["-c", "print(undefined_var)"], None).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("NameError"));
}
#[tokio::test]
async fn test_type_error() {
let r = run(&["-c", "1 + 'a'"], None).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("TypeError"));
}
#[tokio::test]
async fn test_index_error() {
let r = run(&["-c", "lst = [1, 2]\nprint(lst[10])"], None).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("IndexError"));
}
#[tokio::test]
async fn test_empty_stdin() {
let r = run(&["-"], Some("")).await;
assert_eq!(r.exit_code, 1);
}
#[tokio::test]
async fn test_output_before_error() {
let r = run(&["-c", "print('before')\n1/0"], None).await;
assert_eq!(r.exit_code, 1);
assert_eq!(r.stdout, "before\n");
assert!(r.stderr.contains("ZeroDivisionError"));
}
#[tokio::test]
async fn test_vfs_read_text() {
let r = run_with_vfs(
&[
"-c",
"from pathlib import Path\nprint(Path('/tmp/hello.txt').read_text())",
],
&[("/tmp/hello.txt", "hello from vfs")],
&[],
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "hello from vfs\n");
}
#[tokio::test]
async fn test_vfs_write_text() {
let r = run_with_vfs(
&[
"-c",
"from pathlib import Path\nPath('/tmp/out.txt').write_text('written by python')\nprint(Path('/tmp/out.txt').read_text())",
],
&[],
&[],
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "written by python\n");
}
#[tokio::test]
async fn test_vfs_exists() {
let r = run_with_vfs(
&[
"-c",
"from pathlib import Path\nprint(Path('/tmp/hello.txt').exists())\nprint(Path('/tmp/nope.txt').exists())",
],
&[("/tmp/hello.txt", "content")],
&[],
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "True\nFalse\n");
}
#[tokio::test]
async fn test_vfs_is_file_is_dir() {
let r = run_with_vfs(
&[
"-c",
"from pathlib import Path\nprint(Path('/tmp/f.txt').is_file())\nprint(Path('/tmp').is_dir())",
],
&[("/tmp/f.txt", "data")],
&[],
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "True\nTrue\n");
}
#[tokio::test]
async fn test_vfs_read_not_found() {
let r = run_with_vfs(
&[
"-c",
"from pathlib import Path\ntry:\n Path('/no/such/file').read_text()\nexcept FileNotFoundError as e:\n print('caught:', e)",
],
&[],
&[],
)
.await;
assert_eq!(r.exit_code, 0);
assert!(r.stdout.contains("caught:"));
assert!(r.stdout.contains("not found") || r.stdout.contains("No such file"));
}
#[tokio::test]
async fn test_vfs_mkdir() {
let r = run_with_vfs(
&[
"-c",
"from pathlib import Path\nPath('/tmp/newdir').mkdir()\nprint(Path('/tmp/newdir').is_dir())",
],
&[],
&[],
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "True\n");
}
#[tokio::test]
async fn test_vfs_iterdir() {
let r = run_with_vfs(
&[
"-c",
"from pathlib import Path\nfor p in Path('/data').iterdir():\n print(p.name)",
],
&[("/data/a.txt", "a"), ("/data/b.txt", "b")],
&[],
)
.await;
assert_eq!(r.exit_code, 0);
assert!(r.stdout.contains("a.txt"));
assert!(r.stdout.contains("b.txt"));
}
#[tokio::test]
async fn test_vfs_getenv() {
let r = run_with_vfs(
&[
"-c",
"import os\nprint(os.getenv('MY_VAR'))\nprint(os.getenv('MISSING', 'default'))",
],
&[],
&[("MY_VAR", "hello")],
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "hello\ndefault\n");
}
#[tokio::test]
async fn test_vfs_stat() {
let r = run_with_vfs(
&[
"-c",
"from pathlib import Path\ninfo = Path('/tmp/f.txt').stat()\nprint(info.st_size)",
],
&[("/tmp/f.txt", "12345")],
&[],
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "5\n");
}
#[tokio::test]
async fn test_custom_limits_tight_memory() {
let limits = PythonLimits::default().max_memory(1024);
let py = Python::with_limits(limits);
let args = vec!["-c".to_string(), "x = list(range(100000))".to_string()];
let env = opt_in_env();
let mut variables = HashMap::new();
let mut cwd = PathBuf::from("/home/user");
let fs = Arc::new(InMemoryFs::new());
let ctx = Context::new_for_test(&args, &env, &mut variables, &mut cwd, fs, None);
let r = py.execute(ctx).await.unwrap();
assert_ne!(r.exit_code, 0, "Tight memory limit should cause failure");
}
#[tokio::test]
async fn test_custom_limits_generous() {
let limits = PythonLimits::default()
.max_allocations(10_000_000)
.max_memory(128 * 1024 * 1024);
let py = Python::with_limits(limits);
let args = vec!["-c".to_string(), "print(sum(range(100)))".to_string()];
let env = opt_in_env();
let mut variables = HashMap::new();
let mut cwd = PathBuf::from("/home/user");
let fs = Arc::new(InMemoryFs::new());
let ctx = Context::new_for_test(&args, &env, &mut variables, &mut cwd, fs, None);
let r = py.execute(ctx).await.unwrap();
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "4950\n");
}
#[test]
fn test_python_limits_builder() {
let limits = PythonLimits::default()
.max_allocations(500)
.max_duration(Duration::from_secs(10))
.max_memory(1024)
.max_recursion(50);
assert_eq!(limits.max_allocations, 500);
assert_eq!(limits.max_duration, Duration::from_secs(10));
assert_eq!(limits.max_memory, 1024);
assert_eq!(limits.max_recursion, 50);
}
#[test]
fn test_python_limits_default() {
let limits = PythonLimits::default();
assert_eq!(limits.max_allocations, 1_000_000);
assert_eq!(limits.max_duration, Duration::from_secs(30));
assert_eq!(limits.max_memory, 64 * 1024 * 1024);
assert_eq!(limits.max_recursion, 200);
}
async fn run_with_external(
code: &str,
fn_names: &[&str],
handler: PythonExternalFnHandler,
) -> ExecResult {
let args = vec!["-c".to_string(), code.to_string()];
let env = opt_in_env();
let mut variables = HashMap::new();
let mut cwd = PathBuf::from("/home/user");
let fs = Arc::new(InMemoryFs::new());
let py = Python::with_limits(PythonLimits::default())
.with_external_handler(fn_names.iter().map(|s| s.to_string()).collect(), handler);
let ctx = Context::new_for_test(&args, &env, &mut variables, &mut cwd, fs, None);
py.execute(ctx).await.unwrap()
}
#[tokio::test]
async fn test_external_fn_return_value() {
let handler: PythonExternalFnHandler = Arc::new(|_name, _args, _kwargs| {
Box::pin(async { ExtFunctionResult::Return(MontyObject::Int(42)) })
});
let r = run_with_external("print(get_answer())", &["get_answer"], handler).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "42\n");
}
#[tokio::test]
async fn test_external_fn_with_args() {
let handler: PythonExternalFnHandler = Arc::new(|_name, args, _kwargs| {
Box::pin(async move {
let a = match &args[0] {
MontyObject::Int(i) => *i,
_ => 0,
};
let b = match &args[1] {
MontyObject::Int(i) => *i,
_ => 0,
};
ExtFunctionResult::Return(MontyObject::Int(a + b))
})
});
let r = run_with_external("print(add(3, 4))", &["add"], handler).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "7\n");
}
#[tokio::test]
async fn test_external_fn_with_kwargs() {
let handler: PythonExternalFnHandler = Arc::new(|_name, _args, kwargs| {
Box::pin(async move {
for (k, v) in &kwargs {
if let (MontyObject::String(key), MontyObject::String(val)) = (k, v)
&& key == "name"
{
return ExtFunctionResult::Return(MontyObject::String(format!(
"hello {val}"
)));
}
}
ExtFunctionResult::Return(MontyObject::String("hello unknown".into()))
})
});
let r = run_with_external("print(greet(name='world'))", &["greet"], handler).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "hello world\n");
}
#[tokio::test]
async fn test_external_fn_error() {
let handler: PythonExternalFnHandler = Arc::new(|_name, _args, _kwargs| {
Box::pin(async {
ExtFunctionResult::Error(MontyException::new(
ExcType::RuntimeError,
Some("something went wrong".into()),
))
})
});
let r = run_with_external("fail()", &["fail"], handler).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("RuntimeError"));
assert!(r.stderr.contains("something went wrong"));
}
#[tokio::test]
async fn test_external_fn_caught_error() {
let handler: PythonExternalFnHandler = Arc::new(|_name, _args, _kwargs| {
Box::pin(async {
ExtFunctionResult::Error(MontyException::new(
ExcType::ValueError,
Some("bad value".into()),
))
})
});
let r = run_with_external(
"try:\n fail()\nexcept ValueError as e:\n print(f'caught: {e}')",
&["fail"],
handler,
)
.await;
assert_eq!(r.exit_code, 0);
assert!(r.stdout.contains("caught:"));
assert!(r.stdout.contains("bad value"));
}
#[tokio::test]
async fn test_external_fn_multiple_calls() {
let counter = Arc::new(std::sync::atomic::AtomicI64::new(0));
let counter_clone = counter.clone();
let handler: PythonExternalFnHandler = Arc::new(move |_name, _args, _kwargs| {
let c = counter_clone.clone();
Box::pin(async move {
let val = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ExtFunctionResult::Return(MontyObject::Int(val))
})
});
let r = run_with_external(
"a = next_id()\nb = next_id()\nc = next_id()\nprint(a, b, c)",
&["next_id"],
handler,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "0 1 2\n");
}
#[tokio::test]
async fn test_external_fn_returns_string() {
let handler: PythonExternalFnHandler = Arc::new(|_name, args, _kwargs| {
Box::pin(async move {
let input = match &args[0] {
MontyObject::String(s) => s.clone(),
_ => String::new(),
};
ExtFunctionResult::Return(MontyObject::String(input.to_uppercase()))
})
});
let r = run_with_external("print(upper('hello'))", &["upper"], handler).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "HELLO\n");
}
#[tokio::test]
async fn test_external_fn_dispatches_by_name() {
let handler: PythonExternalFnHandler = Arc::new(|name, _args, _kwargs| {
Box::pin(async move {
let result = match name.as_str() {
"get_x" => MontyObject::Int(10),
"get_y" => MontyObject::Int(20),
_ => MontyObject::None,
};
ExtFunctionResult::Return(result)
})
});
let r = run_with_external("print(get_x() + get_y())", &["get_x", "get_y"], handler).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout, "30\n");
}
#[tokio::test]
async fn test_unregistered_name_reference_raises_name_error() {
let handler: PythonExternalFnHandler = Arc::new(|_name, _args, _kwargs| {
Box::pin(async { ExtFunctionResult::Return(MontyObject::Int(1)) })
});
let r = run_with_external("x = unknown_var", &["registered_fn"], handler).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("NameError"));
}
#[tokio::test]
async fn test_math_module() {
let r = run(&["-c", "import math; print(math.sqrt(144))"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "12.0");
}
#[tokio::test]
async fn test_re_module_is_blocked() {
let r = run(
&[
"-c",
"import re; m = re.search(r'(\\d+)', 'abc123def'); print(m.group(1))",
],
None,
)
.await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("importing module 're' is disabled"));
}
#[tokio::test]
async fn test_re_module_dynamic_import_is_blocked() {
let r = run(&["-c", "m = __import__('re')"], None).await;
assert_eq!(r.exit_code, 1);
assert!(r.stderr.contains("importing module 're' is disabled"));
}
#[tokio::test]
async fn test_filter_builtin() {
let r = run(
&["-c", "print(list(filter(lambda x: x > 2, [1, 2, 3, 4])))"],
None,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "[3, 4]");
}
#[tokio::test]
async fn test_getattr_builtin() {
let r = run(
&["-c", "print(getattr('hello', 'missing_attr', 'default'))"],
None,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "default");
}
#[tokio::test]
async fn test_tuple_comparison() {
let r = run(&["-c", "print((1, 2) < (1, 3))"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "True");
}
#[tokio::test]
async fn test_pep448_unpacking() {
let r = run(&["-c", "a = [1, 2]; b = [3, 4]; print([*a, *b])"], None).await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "[1, 2, 3, 4]");
}
#[tokio::test]
async fn test_dict_constructor_from_iterable() {
let r = run(
&[
"-c",
"d = dict([('a', 1), ('b', 2)]); print(d['a'], d['b'])",
],
None,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "1 2");
}
#[tokio::test]
async fn test_date_today() {
let r = run(
&[
"-c",
"from datetime import date\nd = date.today()\nprint(f'{d.year}-{d.month}-{d.day}')",
],
None,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "2024-1-1");
}
#[tokio::test]
async fn test_datetime_now_naive() {
let r = run(
&[
"-c",
"from datetime import datetime\ndt = datetime.now()\nprint(f'{dt.year}-{dt.month}-{dt.day} {dt.hour}:{dt.minute}:{dt.second}.{dt.microsecond}')",
],
None,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "2024-1-1 0:0:0.123456");
}
#[tokio::test]
async fn test_datetime_now_utc() {
let r = run(
&[
"-c",
"from datetime import datetime, timezone\ndt = datetime.now(timezone.utc)\nprint(f'{dt.year}-{dt.month}-{dt.day} {dt.hour}:{dt.minute}:{dt.second}.{dt.microsecond}')",
],
None,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "2024-1-1 0:0:0.123456");
}
#[tokio::test]
async fn test_datetime_attributes() {
let r = run(
&[
"-c",
"from datetime import datetime\ndt = datetime.now()\nassert 1 <= dt.month <= 12\nassert 1 <= dt.day <= 31\nassert 0 <= dt.hour <= 23\nprint('ok')",
],
None,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "ok");
}
#[tokio::test]
async fn test_json_dumps_loads() {
let r = run(
&[
"-c",
"import json\nd = {'a': 1, 'b': [2, 3]}\ns = json.dumps(d)\nprint(json.loads(s) == d)",
],
None,
)
.await;
assert_eq!(r.exit_code, 0);
assert_eq!(r.stdout.trim(), "True");
}
}