use std::collections::BTreeMap;
use std::io;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::LazyLock;
use memchr::memmem::Finder;
use serde::Deserialize;
use thiserror::Error;
use tracing::instrument;
use url::Url;
use uv_configuration::NoSources;
use uv_normalize::PackageName;
use uv_pep440::VersionSpecifiers;
use uv_pypi_types::VerbatimParsedUrl;
use uv_redacted::DisplaySafeUrl;
use uv_settings::{GlobalOptions, ResolverInstallerSchema};
use uv_warnings::warn_user;
use uv_workspace::pyproject::{ExtraBuildDependency, Sources};
static FINDER: LazyLock<Finder> = LazyLock::new(|| Finder::new(b"# /// script"));
#[derive(Debug)]
pub enum Pep723Item {
Script(Pep723Script),
Stdin(Pep723Metadata),
Remote(Pep723Metadata, DisplaySafeUrl),
}
impl Pep723Item {
pub fn metadata(&self) -> &Pep723Metadata {
match self {
Self::Script(script) => &script.metadata,
Self::Stdin(metadata) => metadata,
Self::Remote(metadata, ..) => metadata,
}
}
pub fn into_metadata(self) -> Pep723Metadata {
match self {
Self::Script(script) => script.metadata,
Self::Stdin(metadata) => metadata,
Self::Remote(metadata, ..) => metadata,
}
}
pub fn path(&self) -> Option<&Path> {
match self {
Self::Script(script) => Some(&script.path),
Self::Stdin(..) => None,
Self::Remote(..) => None,
}
}
pub fn as_script(&self) -> Option<&Pep723Script> {
match self {
Self::Script(script) => Some(script),
_ => None,
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum Pep723ItemRef<'item> {
Script(&'item Pep723Script),
Stdin(&'item Pep723Metadata),
Remote(&'item Pep723Metadata, &'item Url),
}
impl Pep723ItemRef<'_> {
pub fn metadata(&self) -> &Pep723Metadata {
match self {
Self::Script(script) => &script.metadata,
Self::Stdin(metadata) => metadata,
Self::Remote(metadata, ..) => metadata,
}
}
pub fn path(&self) -> Option<&Path> {
match self {
Self::Script(script) => Some(&script.path),
Self::Stdin(..) => None,
Self::Remote(..) => None,
}
}
pub fn directory(&self) -> Result<PathBuf, io::Error> {
match self {
Self::Script(script) => Ok(std::path::absolute(&script.path)?
.parent()
.expect("script path has no parent")
.to_owned()),
Self::Stdin(..) | Self::Remote(..) => std::env::current_dir(),
}
}
pub fn indexes(&self, source_strategy: &NoSources) -> &[uv_distribution_types::Index] {
match source_strategy {
NoSources::None => self
.metadata()
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.top_level.index.as_deref())
.unwrap_or(&[]),
NoSources::All | NoSources::Packages(_) => &[],
}
}
pub fn sources(&self, source_strategy: &NoSources) -> &BTreeMap<PackageName, Sources> {
static EMPTY: BTreeMap<PackageName, Sources> = BTreeMap::new();
match source_strategy {
NoSources::None => self
.metadata()
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.sources.as_ref())
.unwrap_or(&EMPTY),
NoSources::All | NoSources::Packages(_) => &EMPTY,
}
}
}
impl<'item> From<&'item Pep723Item> for Pep723ItemRef<'item> {
fn from(item: &'item Pep723Item) -> Self {
match item {
Pep723Item::Script(script) => Self::Script(script),
Pep723Item::Stdin(metadata) => Self::Stdin(metadata),
Pep723Item::Remote(metadata, url) => Self::Remote(metadata, url),
}
}
}
impl<'item> From<&'item Pep723Script> for Pep723ItemRef<'item> {
fn from(script: &'item Pep723Script) -> Self {
Self::Script(script)
}
}
#[derive(Debug, Clone)]
pub struct Pep723Script {
pub path: PathBuf,
pub metadata: Pep723Metadata,
pub prelude: String,
pub postlude: String,
}
impl Pep723Script {
pub async fn read(file: impl AsRef<Path>) -> Result<Option<Self>, Pep723Error> {
let contents = fs_err::tokio::read(&file).await?;
let ScriptTag {
prelude,
metadata,
postlude,
} = match ScriptTag::parse(&contents) {
Ok(Some(tag)) => tag,
Ok(None) => return Ok(None),
Err(err) => return Err(err),
};
let metadata = Pep723Metadata::from_str(&metadata)?;
Ok(Some(Self {
path: std::path::absolute(file)?,
metadata,
prelude,
postlude,
}))
}
pub async fn init(
file: impl AsRef<Path>,
requires_python: &VersionSpecifiers,
) -> Result<Self, Pep723Error> {
let contents = fs_err::tokio::read(&file).await?;
let (prelude, metadata, postlude) = Self::init_metadata(&contents, requires_python)?;
Ok(Self {
path: std::path::absolute(file)?,
metadata,
prelude,
postlude,
})
}
pub fn init_metadata(
contents: &[u8],
requires_python: &VersionSpecifiers,
) -> Result<(String, Pep723Metadata, String), Pep723Error> {
let default_metadata = if requires_python.is_empty() {
indoc::formatdoc! {r"
dependencies = []
",
}
} else {
indoc::formatdoc! {r#"
requires-python = "{requires_python}"
dependencies = []
"#,
requires_python = requires_python,
}
};
let metadata = Pep723Metadata::from_str(&default_metadata)?;
let (shebang, postlude) = extract_shebang(contents)?;
let postlude = if postlude.strip_prefix('#').is_some_and(|postlude| {
postlude
.chars()
.next()
.is_some_and(|c| matches!(c, ' ' | '\r' | '\n'))
}) {
format!("\n{postlude}")
} else {
postlude
};
Ok((
if shebang.is_empty() {
String::new()
} else {
format!("{shebang}\n")
},
metadata,
postlude,
))
}
pub async fn create(
file: impl AsRef<Path>,
requires_python: &VersionSpecifiers,
existing_contents: Option<Vec<u8>>,
bare: bool,
) -> Result<(), Pep723Error> {
let file = file.as_ref();
let script_name = file
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| Pep723Error::InvalidFilename(file.to_string_lossy().to_string()))?;
let default_metadata = indoc::formatdoc! {r#"
requires-python = "{requires_python}"
dependencies = []
"#,
};
let metadata = serialize_metadata(&default_metadata);
let script = if let Some(existing_contents) = existing_contents {
let (mut shebang, contents) = extract_shebang(&existing_contents)?;
if !shebang.is_empty() {
shebang.push_str("\n#\n");
if !regex::Regex::new(r"\buv\b").unwrap().is_match(&shebang) {
warn_user!(
"If you execute {} directly, it might ignore its inline metadata.\nConsider replacing its shebang with: {}",
file.to_string_lossy().cyan(),
"#!/usr/bin/env -S uv run --script".cyan(),
);
}
}
indoc::formatdoc! {r"
{shebang}{metadata}
{contents}" }
} else if bare {
metadata
} else {
indoc::formatdoc! {r#"
{metadata}
def main() -> None:
print("Hello from {name}!")
if __name__ == "__main__":
main()
"#,
metadata = metadata,
name = script_name,
}
};
Ok(fs_err::tokio::write(file, script).await?)
}
pub fn write(&self, metadata: &str) -> Result<(), io::Error> {
let content = format!(
"{}{}{}",
self.prelude,
serialize_metadata(metadata),
self.postlude
);
fs_err::write(&self.path, content)?;
Ok(())
}
pub fn sources(&self) -> &BTreeMap<PackageName, Sources> {
static EMPTY: BTreeMap<PackageName, Sources> = BTreeMap::new();
self.metadata
.tool
.as_ref()
.and_then(|tool| tool.uv.as_ref())
.and_then(|uv| uv.sources.as_ref())
.unwrap_or(&EMPTY)
}
}
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct Pep723Metadata {
pub dependencies: Option<Vec<uv_pep508::Requirement<VerbatimParsedUrl>>>,
pub requires_python: Option<VersionSpecifiers>,
pub tool: Option<Tool>,
#[serde(skip)]
pub raw: String,
}
impl Pep723Metadata {
pub fn parse(contents: &[u8]) -> Result<Option<Self>, Pep723Error> {
let ScriptTag { metadata, .. } = match ScriptTag::parse(contents) {
Ok(Some(tag)) => tag,
Ok(None) => return Ok(None),
Err(err) => return Err(err),
};
Ok(Some(Self::from_str(&metadata)?))
}
pub async fn read(file: impl AsRef<Path>) -> Result<Option<Self>, Pep723Error> {
let contents = fs_err::tokio::read(&file).await?;
let ScriptTag { metadata, .. } = match ScriptTag::parse(&contents) {
Ok(Some(tag)) => tag,
Ok(None) => return Ok(None),
Err(err) => return Err(err),
};
Ok(Some(Self::from_str(&metadata)?))
}
}
impl FromStr for Pep723Metadata {
type Err = toml::de::Error;
#[instrument(name = "toml::from_str PEP 723 metadata", skip_all)]
fn from_str(raw: &str) -> Result<Self, Self::Err> {
let metadata = toml::from_str(raw)?;
Ok(Self {
raw: raw.to_string(),
..metadata
})
}
}
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct Tool {
pub uv: Option<ToolUv>,
}
#[derive(Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields, rename_all = "kebab-case")]
pub struct ToolUv {
#[serde(flatten)]
pub globals: GlobalOptions,
#[serde(flatten)]
pub top_level: ResolverInstallerSchema,
pub override_dependencies: Option<Vec<uv_pep508::Requirement<VerbatimParsedUrl>>>,
pub exclude_dependencies: Option<Vec<uv_normalize::PackageName>>,
pub constraint_dependencies: Option<Vec<uv_pep508::Requirement<VerbatimParsedUrl>>>,
pub build_constraint_dependencies: Option<Vec<uv_pep508::Requirement<VerbatimParsedUrl>>>,
pub extra_build_dependencies: Option<BTreeMap<PackageName, Vec<ExtraBuildDependency>>>,
pub sources: Option<BTreeMap<PackageName, Sources>>,
}
#[derive(Debug, Error)]
pub enum Pep723Error {
#[error(
"An opening tag (`# /// script`) was found without a closing tag (`# ///`). Ensure that every line between the opening and closing tags (including empty lines) starts with a leading `#`."
)]
UnclosedBlock,
#[error("The PEP 723 metadata block is missing from the script.")]
MissingTag,
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
Utf8(#[from] std::str::Utf8Error),
#[error(transparent)]
Toml(#[from] toml::de::Error),
#[error("Invalid filename `{0}` supplied")]
InvalidFilename(String),
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ScriptTag {
prelude: String,
metadata: String,
postlude: String,
}
impl ScriptTag {
pub fn parse(contents: &[u8]) -> Result<Option<Self>, Pep723Error> {
let Some(index) = FINDER.find(contents) else {
return Ok(None);
};
if !(index == 0 || matches!(contents[index - 1], b'\r' | b'\n')) {
return Ok(None);
}
let prelude = std::str::from_utf8(&contents[..index])?;
let contents = &contents[index..];
let contents = std::str::from_utf8(contents)?;
let mut lines = contents.lines();
if lines.next().is_none_or(|line| line != "# /// script") {
return Ok(None);
}
let mut toml = vec![];
for line in lines {
let Some(line) = line.strip_prefix('#') else {
break;
};
if line.is_empty() {
toml.push("");
continue;
}
let Some(line) = line.strip_prefix(' ') else {
break;
};
toml.push(line);
}
let Some(index) = toml.iter().rev().position(|line| *line == "///") else {
return Err(Pep723Error::UnclosedBlock);
};
let index = toml.len() - index;
toml.truncate(index - 1);
let prelude = prelude.to_string();
let metadata = toml.join("\n") + "\n";
let postlude = contents
.lines()
.skip(index + 1)
.collect::<Vec<_>>()
.join("\n")
+ "\n";
Ok(Some(Self {
prelude,
metadata,
postlude,
}))
}
}
fn extract_shebang(contents: &[u8]) -> Result<(String, String), Pep723Error> {
let contents = std::str::from_utf8(contents)?;
if contents.starts_with("#!") {
let bytes = contents.as_bytes();
let index = bytes
.iter()
.position(|&b| b == b'\r' || b == b'\n')
.unwrap_or(bytes.len());
let width = match bytes.get(index) {
Some(b'\r') => {
if bytes.get(index + 1) == Some(&b'\n') {
2
} else {
1
}
}
Some(b'\n') => 1,
_ => 0,
};
let shebang = contents[..index].to_string();
let script = contents[index + width..].to_string();
Ok((shebang, script))
} else {
Ok((String::new(), contents.to_string()))
}
}
fn serialize_metadata(metadata: &str) -> String {
let mut output = String::with_capacity(metadata.len() + 32);
output.push_str("# /// script");
output.push('\n');
for line in metadata.lines() {
output.push('#');
if !line.is_empty() {
output.push(' ');
output.push_str(line);
}
output.push('\n');
}
output.push_str("# ///");
output.push('\n');
output
}
#[cfg(test)]
mod tests {
use crate::{Pep723Error, Pep723Script, ScriptTag, serialize_metadata};
use std::str::FromStr;
#[test]
fn missing_space() {
let contents = indoc::indoc! {r"
# /// script
#requires-python = '>=3.11'
# ///
"};
assert!(matches!(
ScriptTag::parse(contents.as_bytes()),
Err(Pep723Error::UnclosedBlock)
));
}
#[test]
fn no_closing_pragma() {
let contents = indoc::indoc! {r"
# /// script
# requires-python = '>=3.11'
# dependencies = [
# 'requests<3',
# 'rich',
# ]
"};
assert!(matches!(
ScriptTag::parse(contents.as_bytes()),
Err(Pep723Error::UnclosedBlock)
));
}
#[test]
fn leading_content() {
let contents = indoc::indoc! {r"
pass # /// script
# requires-python = '>=3.11'
# dependencies = [
# 'requests<3',
# 'rich',
# ]
# ///
#
#
"};
assert_eq!(ScriptTag::parse(contents.as_bytes()).unwrap(), None);
}
#[test]
fn simple() {
let contents = indoc::indoc! {r"
# /// script
# requires-python = '>=3.11'
# dependencies = [
# 'requests<3',
# 'rich',
# ]
# ///
import requests
from rich.pretty import pprint
resp = requests.get('https://peps.python.org/api/peps.json')
data = resp.json()
"};
let expected_metadata = indoc::indoc! {r"
requires-python = '>=3.11'
dependencies = [
'requests<3',
'rich',
]
"};
let expected_data = indoc::indoc! {r"
import requests
from rich.pretty import pprint
resp = requests.get('https://peps.python.org/api/peps.json')
data = resp.json()
"};
let actual = ScriptTag::parse(contents.as_bytes()).unwrap().unwrap();
assert_eq!(actual.prelude, String::new());
assert_eq!(actual.metadata, expected_metadata);
assert_eq!(actual.postlude, expected_data);
}
#[test]
fn simple_with_shebang() {
let contents = indoc::indoc! {r"
#!/usr/bin/env python3
# /// script
# requires-python = '>=3.11'
# dependencies = [
# 'requests<3',
# 'rich',
# ]
# ///
import requests
from rich.pretty import pprint
resp = requests.get('https://peps.python.org/api/peps.json')
data = resp.json()
"};
let expected_metadata = indoc::indoc! {r"
requires-python = '>=3.11'
dependencies = [
'requests<3',
'rich',
]
"};
let expected_data = indoc::indoc! {r"
import requests
from rich.pretty import pprint
resp = requests.get('https://peps.python.org/api/peps.json')
data = resp.json()
"};
let actual = ScriptTag::parse(contents.as_bytes()).unwrap().unwrap();
assert_eq!(actual.prelude, "#!/usr/bin/env python3\n".to_string());
assert_eq!(actual.metadata, expected_metadata);
assert_eq!(actual.postlude, expected_data);
}
#[test]
fn embedded_comment() {
let contents = indoc::indoc! {r"
# /// script
# embedded-csharp = '''
# /// <summary>
# /// text
# ///
# /// </summary>
# public class MyClass { }
# '''
# ///
"};
let expected = indoc::indoc! {r"
embedded-csharp = '''
/// <summary>
/// text
///
/// </summary>
public class MyClass { }
'''
"};
let actual = ScriptTag::parse(contents.as_bytes())
.unwrap()
.unwrap()
.metadata;
assert_eq!(actual, expected);
}
#[test]
fn trailing_lines() {
let contents = indoc::indoc! {r"
# /// script
# requires-python = '>=3.11'
# dependencies = [
# 'requests<3',
# 'rich',
# ]
# ///
#
#
"};
let expected = indoc::indoc! {r"
requires-python = '>=3.11'
dependencies = [
'requests<3',
'rich',
]
"};
let actual = ScriptTag::parse(contents.as_bytes())
.unwrap()
.unwrap()
.metadata;
assert_eq!(actual, expected);
}
#[test]
fn serialize_metadata_formatting() {
let metadata = indoc::indoc! {r"
requires-python = '>=3.11'
dependencies = [
'requests<3',
'rich',
]
"};
let expected_output = indoc::indoc! {r"
# /// script
# requires-python = '>=3.11'
# dependencies = [
# 'requests<3',
# 'rich',
# ]
# ///
"};
let result = serialize_metadata(metadata);
assert_eq!(result, expected_output);
}
#[test]
fn serialize_metadata_empty() {
let metadata = "";
let expected_output = "# /// script\n# ///\n";
let result = serialize_metadata(metadata);
assert_eq!(result, expected_output);
}
#[test]
fn script_init_empty() {
let contents = "".as_bytes();
let (prelude, metadata, postlude) =
Pep723Script::init_metadata(contents, &uv_pep440::VersionSpecifiers::default())
.unwrap();
assert_eq!(prelude, "");
assert_eq!(
metadata.raw,
indoc::indoc! {r"
dependencies = []
"}
);
assert_eq!(postlude, "");
}
#[test]
fn script_init_requires_python() {
let contents = "".as_bytes();
let (prelude, metadata, postlude) = Pep723Script::init_metadata(
contents,
&uv_pep440::VersionSpecifiers::from_str(">=3.8").unwrap(),
)
.unwrap();
assert_eq!(prelude, "");
assert_eq!(
metadata.raw,
indoc::indoc! {r#"
requires-python = ">=3.8"
dependencies = []
"#}
);
assert_eq!(postlude, "");
}
#[test]
fn script_init_with_hashbang() {
let contents = indoc::indoc! {r#"
#!/usr/bin/env python3
print("Hello, world!")
"#}
.as_bytes();
let (prelude, metadata, postlude) =
Pep723Script::init_metadata(contents, &uv_pep440::VersionSpecifiers::default())
.unwrap();
assert_eq!(prelude, "#!/usr/bin/env python3\n");
assert_eq!(
metadata.raw,
indoc::indoc! {r"
dependencies = []
"}
);
assert_eq!(
postlude,
indoc::indoc! {r#"
print("Hello, world!")
"#}
);
}
#[test]
fn script_init_with_other_metadata() {
let contents = indoc::indoc! {r#"
# /// noscript
# Hello,
#
# World!
# ///
print("Hello, world!")
"#}
.as_bytes();
let (prelude, metadata, postlude) =
Pep723Script::init_metadata(contents, &uv_pep440::VersionSpecifiers::default())
.unwrap();
assert_eq!(prelude, "");
assert_eq!(
metadata.raw,
indoc::indoc! {r"
dependencies = []
"}
);
assert_eq!(
postlude,
indoc::indoc! {r#"
# /// noscript
# Hello,
#
# World!
# ///
print("Hello, world!")
"#}
);
}
#[test]
fn script_init_with_hashbang_and_other_metadata() {
let contents = indoc::indoc! {r#"
#!/usr/bin/env python3
# /// noscript
# Hello,
#
# World!
# ///
print("Hello, world!")
"#}
.as_bytes();
let (prelude, metadata, postlude) =
Pep723Script::init_metadata(contents, &uv_pep440::VersionSpecifiers::default())
.unwrap();
assert_eq!(prelude, "#!/usr/bin/env python3\n");
assert_eq!(
metadata.raw,
indoc::indoc! {r"
dependencies = []
"}
);
assert_eq!(
postlude,
indoc::indoc! {r#"
# /// noscript
# Hello,
#
# World!
# ///
print("Hello, world!")
"#}
);
}
#[test]
fn script_init_with_valid_metadata_line() {
let contents = indoc::indoc! {r#"
# Hello,
# /// noscript
#
# World!
# ///
print("Hello, world!")
"#}
.as_bytes();
let (prelude, metadata, postlude) =
Pep723Script::init_metadata(contents, &uv_pep440::VersionSpecifiers::default())
.unwrap();
assert_eq!(prelude, "");
assert_eq!(
metadata.raw,
indoc::indoc! {r"
dependencies = []
"}
);
assert_eq!(
postlude,
indoc::indoc! {r#"
# Hello,
# /// noscript
#
# World!
# ///
print("Hello, world!")
"#}
);
}
#[test]
fn script_init_with_valid_empty_metadata_line() {
let contents = indoc::indoc! {r#"
#
# /// noscript
# Hello,
# World!
# ///
print("Hello, world!")
"#}
.as_bytes();
let (prelude, metadata, postlude) =
Pep723Script::init_metadata(contents, &uv_pep440::VersionSpecifiers::default())
.unwrap();
assert_eq!(prelude, "");
assert_eq!(
metadata.raw,
indoc::indoc! {r"
dependencies = []
"}
);
assert_eq!(
postlude,
indoc::indoc! {r#"
#
# /// noscript
# Hello,
# World!
# ///
print("Hello, world!")
"#}
);
}
#[test]
fn script_init_with_non_metadata_comment() {
let contents = indoc::indoc! {r#"
#Hello,
# /// noscript
#
# World!
# ///
print("Hello, world!")
"#}
.as_bytes();
let (prelude, metadata, postlude) =
Pep723Script::init_metadata(contents, &uv_pep440::VersionSpecifiers::default())
.unwrap();
assert_eq!(prelude, "");
assert_eq!(
metadata.raw,
indoc::indoc! {r"
dependencies = []
"}
);
assert_eq!(
postlude,
indoc::indoc! {r#"
#Hello,
# /// noscript
#
# World!
# ///
print("Hello, world!")
"#}
);
}
}