use std::result::Result;
use rcgen::{BasicConstraints, CertificateParams, IsCa, Issuer, KeyPair};
use testcontainers::{
core::{
wait::HttpWaitStrategy, CmdWaitFor, ContainerPort, ContainerState, ExecCommand, WaitFor,
},
CopyDataSource, CopyToContainer, Image, TestcontainersError,
};
pub const GITEA_SSH_PORT: ContainerPort = ContainerPort::Tcp(2222);
pub const GITEA_HTTP_PORT: ContainerPort = ContainerPort::Tcp(3000);
pub const GITEA_HTTP_REDIRECT_PORT: ContainerPort = ContainerPort::Tcp(3080);
pub const GITEA_DEFAULT_ADMIN_USERNAME: &str = "git-admin";
pub const GITEA_DEFAULT_ADMIN_PASSWORD: &str = "git-admin";
pub const GITEA_CONFIG_FOLDER: &str = "/etc/gitea";
pub const GITEA_DATA_FOLDER: &str = "/var/lib/gitea";
const GITEA_IMAGE_NAME: &str = "gitea/gitea";
const GITEA_IMAGE_TAG: &str = "1.22.3-rootless";
const TLS_CERT_FILE_NAME: &str = "cert.pem";
const TLS_KEY_FILE_NAME: &str = "key.pem";
const CONFIG_FILE_NAME: &str = "app.ini";
#[derive(Debug, Clone)]
pub struct Gitea {
git_hostname: String,
admin_username: String,
admin_password: String,
admin_key: Option<String>,
admin_commands: Vec<Vec<String>>,
tls: Option<GiteaTlsCert>,
repos: Vec<GiteaRepo>,
copy_to_sources: Vec<CopyToContainer>,
}
impl Default for Gitea {
fn default() -> Self {
Self {
git_hostname: "localhost".to_string(),
admin_username: GITEA_DEFAULT_ADMIN_USERNAME.to_string(),
admin_password: GITEA_DEFAULT_ADMIN_PASSWORD.to_string(),
admin_key: None,
admin_commands: vec![],
tls: None,
repos: vec![],
copy_to_sources: vec![Self::render_app_ini("http", "localhost", false)],
}
}
}
impl Image for Gitea {
fn name(&self) -> &str {
GITEA_IMAGE_NAME
}
fn tag(&self) -> &str {
GITEA_IMAGE_TAG
}
fn ready_conditions(&self) -> Vec<WaitFor> {
let http_check = match self.tls {
Some(_) => WaitFor::seconds(5), None => WaitFor::http(
HttpWaitStrategy::new("/api/swagger")
.with_port(GITEA_HTTP_PORT)
.with_expected_status_code(200_u16),
),
};
vec![
WaitFor::message_on_stdout(format!(
"Starting new Web server: tcp:0.0.0.0:{}",
GITEA_HTTP_PORT.as_u16()
)),
http_check,
]
}
fn copy_to_sources(&self) -> impl IntoIterator<Item = &CopyToContainer> {
&self.copy_to_sources
}
fn expose_ports(&self) -> &[ContainerPort] {
if self.tls.is_some() {
&[GITEA_SSH_PORT, GITEA_HTTP_PORT, GITEA_HTTP_REDIRECT_PORT]
} else {
&[GITEA_SSH_PORT, GITEA_HTTP_PORT]
}
}
fn exec_after_start(
&self,
_cs: ContainerState,
) -> Result<Vec<ExecCommand>, TestcontainersError> {
let mut start_commands = vec![self.create_admin_user_cmd()];
if let Some(key) = &self.admin_key {
start_commands.push(self.create_admin_key_cmd(key));
}
self.repos.iter().for_each(|r| {
start_commands.push(self.create_repo_cmd(r));
});
let admin_commands: Vec<Vec<String>> = self
.admin_commands
.clone()
.into_iter()
.map(|v| {
vec!["gitea".to_string(), "admin".to_string()]
.into_iter()
.chain(v)
.collect::<Vec<String>>()
})
.collect();
start_commands.extend(admin_commands);
let commands: Vec<ExecCommand> = start_commands
.iter()
.map(|v| ExecCommand::new(v).with_cmd_ready_condition(CmdWaitFor::exit_code(0)))
.collect();
Ok(commands)
}
}
impl Gitea {
pub fn with_admin_account(
self,
username: impl Into<String>,
password: impl Into<String>,
public_key: Option<String>,
) -> Self {
Self {
admin_username: username.into(),
admin_password: password.into(),
admin_key: public_key,
..self
}
}
pub fn with_git_hostname(self, hostname: impl Into<String>) -> Self {
let new = Self {
git_hostname: hostname.into(),
..self
};
Self {
copy_to_sources: new.generate_copy_to_sources(),
..new
}
}
pub fn with_repo(self, repo: GiteaRepo) -> Self {
let mut repos = self.repos;
repos.push(repo);
Self { repos, ..self }
}
pub fn with_admin_command(self, command: impl IntoIterator<Item = impl Into<String>>) -> Self {
let command = command
.into_iter()
.map(|s| s.into())
.collect::<Vec<String>>();
let mut admin_commands = self.admin_commands;
admin_commands.push(command);
Self {
admin_commands,
..self
}
}
pub fn with_tls(self, enabled: bool) -> Self {
let new = Self {
tls: if enabled {
Some(GiteaTlsCert::default())
} else {
None
},
..self
};
Self {
copy_to_sources: new.generate_copy_to_sources(),
..new
}
}
pub fn with_tls_certs(self, cert: impl Into<String>, key: impl Into<String>) -> Self {
let new = Self {
tls: Some(GiteaTlsCert::from_pem(cert.into(), key.into())),
..self
};
Self {
copy_to_sources: new.generate_copy_to_sources(),
..new
}
}
pub fn tls_ca(&self) -> Option<&str> {
self.tls.as_ref().and_then(|t| t.ca())
}
fn generate_copy_to_sources(&self) -> Vec<CopyToContainer> {
let mut to_copy = vec![];
let app_ini = Self::render_app_ini(
self.protocol(),
self.git_hostname.as_str(),
self.tls.is_some(),
);
to_copy.push(app_ini);
if let Some(tls_config) = &self.tls {
let cert = CopyToContainer::new(
CopyDataSource::Data(tls_config.cert.clone().into_bytes()),
format!("{GITEA_CONFIG_FOLDER}/{TLS_CERT_FILE_NAME}"),
);
let key = CopyToContainer::new(
CopyDataSource::Data(tls_config.key.clone().into_bytes()),
format!("{GITEA_CONFIG_FOLDER}/{TLS_KEY_FILE_NAME}"),
);
to_copy.push(cert);
to_copy.push(key);
}
to_copy
}
fn render_app_ini(protocol: &str, hostname: &str, is_tls: bool) -> CopyToContainer {
let redirect_port = GITEA_HTTP_REDIRECT_PORT.as_u16();
let mut app_ini_template = include_str!("app.ini").to_string();
let host_template_part = format!(
r#"
DOMAIN = {hostname}
SSH_DOMAIN = {hostname}
ROOT_URL = {protocol}://{hostname}/
PROTOCOL = {protocol}
"#,
);
app_ini_template.push_str(&host_template_part);
if is_tls {
let tls_config = format!(
r#"
CERT_FILE = {GITEA_CONFIG_FOLDER}/{TLS_CERT_FILE_NAME}
KEY_FILE = {GITEA_CONFIG_FOLDER}/{TLS_KEY_FILE_NAME}
REDIRECT_OTHER_PORT = true
PORT_TO_REDIRECT = {redirect_port}
"#
);
app_ini_template.push_str(&tls_config);
}
CopyToContainer::new(
CopyDataSource::Data(app_ini_template.into_bytes()),
format!("{GITEA_CONFIG_FOLDER}/{CONFIG_FILE_NAME}"),
)
}
fn create_admin_user_cmd(&self) -> Vec<String> {
vec![
"gitea",
"admin",
"user",
"create",
"--username",
self.admin_username.as_str(),
"--password",
self.admin_password.as_str(),
"--email",
format!("{}@localhost", self.admin_username).as_str(),
"--admin",
]
.into_iter()
.map(String::from)
.collect::<Vec<String>>()
}
fn create_admin_key_cmd(&self, key: &String) -> Vec<String> {
let body = format!(r#"{{"title":"default","key":"{key}","read_only":false}}"#);
self.create_gitea_api_curl_cmd("POST", "/user/keys", Some(body))
}
fn create_repo_cmd(&self, repo: &GiteaRepo) -> Vec<String> {
let (repo, private) = match repo {
GiteaRepo::Private(name) => (name, "true"),
GiteaRepo::Public(name) => (name, "false"),
};
let body = format!(
r#"{{"name":"{repo}","readme":"Default","auto_init":true,"private":{private}}}"#,
);
self.create_gitea_api_curl_cmd("POST", "/user/repos", Some(body))
}
fn create_gitea_api_curl_cmd(
&self,
method: &str,
api_path: &str,
body: Option<String>,
) -> Vec<String> {
let mut curl = vec![
"curl",
"-sk",
"-X",
method,
"-H",
"accept: application/json",
"-H",
"Content-Type: application/json",
"-u",
format!("{}:{}", self.admin_username, self.admin_password).as_str(),
]
.into_iter()
.map(String::from)
.collect::<Vec<String>>();
if let Some(body) = body {
curl.push("-d".to_string());
curl.push(body);
}
curl.push(self.api_url(api_path));
curl
}
fn protocol(&self) -> &str {
if self.tls.is_some() {
"https"
} else {
"http"
}
}
fn api_url(&self, api: &str) -> String {
let api = api.strip_prefix('/').unwrap_or(api);
format!(
"{}://localhost:{}/api/v1/{api}",
self.protocol(),
GITEA_HTTP_PORT.as_u16()
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum GiteaRepo {
Private(String),
Public(String),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct GiteaTlsCert {
cert: String,
key: String,
ca: Option<String>,
}
impl Default for GiteaTlsCert {
fn default() -> Self {
Self::new("localhost")
}
}
impl GiteaTlsCert {
fn new(hostname: impl Into<String>) -> Self {
let ca_key = KeyPair::generate().unwrap();
let mut ca_cert = CertificateParams::new(vec!["Gitea root CA".to_string()]).unwrap();
ca_cert.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
let mut hostnames = vec![
"localhost".to_string(),
"127.0.0.1".to_string(),
"::1".to_string(),
];
let hostname = hostname.into();
if hostname != "localhost" {
hostnames.insert(0, hostname);
}
let key = KeyPair::generate().unwrap();
let issuer = Issuer::from_params(&ca_cert, &ca_key);
let cert = CertificateParams::new(hostnames)
.unwrap()
.signed_by(&key, &issuer)
.unwrap();
let ca_cert = ca_cert.self_signed(&ca_key).unwrap();
Self {
cert: cert.pem(),
key: key.serialize_pem(),
ca: Some(ca_cert.pem()),
}
}
fn from_pem(cert: impl Into<String>, key: impl Into<String>) -> Self {
Self {
cert: cert.into(),
key: key.into(),
ca: None,
}
}
fn ca(&self) -> Option<&str> {
self.ca.as_deref()
}
}
#[cfg(test)]
mod tests {
use reqwest::Certificate;
use serde_json::Value;
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use super::*;
const TEST_PUBLIC_KEY: &str =
"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJRE5a67/cTbR6DpWqzBl6BTY0LE0Hg715ZI/FMK7iCH";
const TEST_ADMIN_USERNAME: &str = "non-default-user";
const TEST_ADMIN_PASSWORD: &str = "some-dummy-password";
const TEST_PUBLIC_REPO: &str = "test-public-repo";
const TEST_PRIVATE_REPO: &str = "test-private-repo";
async fn api_url(container: &ContainerAsync<Gitea>, api: &str) -> String {
let api = api.strip_prefix('/').unwrap_or(api);
let host = container.get_host().await.unwrap();
let port = container.get_host_port_ipv4(GITEA_HTTP_PORT).await.unwrap();
format!(
"{}://{host}:{port}/api/v1/{api}",
container.image().protocol(),
)
}
#[tokio::test]
async fn gitea_defaults() {
let gitea = Gitea::default().start().await.unwrap();
let response = reqwest::Client::new()
.get(api_url(&gitea, &format!("/users/{GITEA_DEFAULT_ADMIN_USERNAME}")).await)
.basic_auth(
GITEA_DEFAULT_ADMIN_USERNAME,
Some(GITEA_DEFAULT_ADMIN_PASSWORD),
)
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let keys_list = reqwest::Client::new()
.get(api_url(&gitea, "/user/keys").await)
.basic_auth(
GITEA_DEFAULT_ADMIN_USERNAME,
Some(GITEA_DEFAULT_ADMIN_PASSWORD),
)
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
let keys_list = keys_list.as_array().unwrap();
assert!(keys_list.is_empty());
}
#[tokio::test]
async fn gitea_with_tls() {
let gitea = Gitea::default().with_tls(true).start().await.unwrap();
let response = reqwest::Client::new()
.get(api_url(&gitea, &format!("/users/{GITEA_DEFAULT_ADMIN_USERNAME}")).await)
.basic_auth(
GITEA_DEFAULT_ADMIN_USERNAME,
Some(GITEA_DEFAULT_ADMIN_PASSWORD),
)
.send()
.await;
assert!(response.is_err());
let ca = gitea.image().tls_ca().unwrap();
let ca = Certificate::from_pem(ca.as_bytes()).unwrap();
let client = reqwest::ClientBuilder::new()
.use_rustls_tls()
.add_root_certificate(ca)
.build()
.unwrap();
let response = client
.get(api_url(&gitea, &format!("/users/{GITEA_DEFAULT_ADMIN_USERNAME}")).await)
.basic_auth(
GITEA_DEFAULT_ADMIN_USERNAME,
Some(GITEA_DEFAULT_ADMIN_PASSWORD),
)
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
}
#[tokio::test]
async fn gitea_custom_admin_credentials() {
let gitea = Gitea::default()
.with_admin_account(
TEST_ADMIN_USERNAME,
TEST_ADMIN_PASSWORD,
Some(TEST_PUBLIC_KEY.to_string()),
)
.start()
.await
.unwrap();
let response = reqwest::Client::new()
.get(api_url(&gitea, "/user/keys").await)
.basic_auth(
GITEA_DEFAULT_ADMIN_USERNAME,
Some(GITEA_DEFAULT_ADMIN_PASSWORD),
)
.send()
.await
.unwrap();
assert_eq!(response.status(), 401);
let keys_list = reqwest::Client::new()
.get(api_url(&gitea, "/user/keys").await)
.basic_auth(TEST_ADMIN_USERNAME, Some(TEST_ADMIN_PASSWORD))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
let keys_list = keys_list.as_array().unwrap();
assert_eq!(keys_list.len(), 1);
}
#[tokio::test]
async fn gitea_create_repos() {
let gitea = Gitea::default()
.with_repo(GiteaRepo::Public(TEST_PUBLIC_REPO.to_string()))
.with_repo(GiteaRepo::Private(TEST_PRIVATE_REPO.to_string()))
.start()
.await
.unwrap();
let response = reqwest::Client::new()
.get(
api_url(
&gitea,
&format!("/repos/{GITEA_DEFAULT_ADMIN_USERNAME}/{TEST_PUBLIC_REPO}"),
)
.await,
)
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let response = reqwest::Client::new()
.get(
api_url(
&gitea,
&format!("/repos/{GITEA_DEFAULT_ADMIN_USERNAME}/{TEST_PRIVATE_REPO}"),
)
.await,
)
.send()
.await
.unwrap();
assert_eq!(response.status(), 404);
let response = reqwest::Client::new()
.get(
api_url(
&gitea,
&format!("/repos/{GITEA_DEFAULT_ADMIN_USERNAME}/{TEST_PRIVATE_REPO}"),
)
.await,
)
.basic_auth(
GITEA_DEFAULT_ADMIN_USERNAME,
Some(GITEA_DEFAULT_ADMIN_PASSWORD),
)
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
}
#[tokio::test]
async fn gitea_admin_commands() {
let command = vec![
"user",
"create",
"--username",
TEST_ADMIN_USERNAME,
"--password",
TEST_ADMIN_PASSWORD,
"--email",
format!("{}@localhost", TEST_ADMIN_USERNAME).as_str(),
"--must-change-password=false",
]
.into_iter()
.map(String::from)
.collect::<Vec<String>>();
let gitea = Gitea::default()
.with_admin_command(command)
.start()
.await
.unwrap();
let response = reqwest::Client::new()
.get(api_url(&gitea, &format!("/users/{TEST_ADMIN_USERNAME}")).await)
.basic_auth(
GITEA_DEFAULT_ADMIN_USERNAME,
Some(GITEA_DEFAULT_ADMIN_PASSWORD),
)
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let response = reqwest::Client::new()
.get(api_url(&gitea, "/user/emails").await)
.basic_auth(TEST_ADMIN_USERNAME, Some(TEST_ADMIN_PASSWORD))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
let response = response.as_array().unwrap();
assert_eq!(response.len(), 1);
}
}