kadmin 0.7.2

Rust bindings for the Kerberos administration interface (kadm5)
Documentation
//! Utility to run a krb5 KDC
use std::{ffi::CStr, process::Command};

use anyhow::Result;
use kadmin::KAdm5Variant;
use pyo3::ffi::c_str;
#[allow(unused_imports)]
use pyo3::{prelude::*, types::PyDict};

#[allow(dead_code)]
const K5REALM_MIT_INIT: &CStr = c_str!(
    r#"
import logging
import os
import shutil
from copy import deepcopy
from k5test import realm

logging.basicConfig(level=logging.DEBUG)

def _discover_path(name, default, paths):
    if name in paths:
        return paths[name]
    path = shutil.which(name)
    if path is not None:
        return path
    else:
        return default
realm._discover_path = _discover_path

saved_env = deepcopy(os.environ)

realm_paths = {}
for name in ("kdb5_util", "krb5kdc", "kadmin", "kadmin.local", "kadmind", "kprop", "kinit", "klist"):
    path = os.environ.get("K5TEST_MIT_" + name.upper().replace(".", "_"))
    if path is not None:
        realm_paths[name] = path

realm = realm.MITRealm(start_kadmind=True, **realm_paths)
realm.http_princ = f"HTTP/testserver@{realm.realm}"
realm.http_keytab = os.path.join(realm.tmpdir, "http_keytab")
realm.addprinc(realm.http_princ)
realm.extract_keytab(realm.http_princ, realm.http_keytab)

for k, v in realm.env.items():
    os.environ[k] = v
"#
);

#[allow(dead_code)]
const K5REALM_HEIMDAL_INIT: &CStr = c_str!(
    r#"
import logging
import os
import shutil
from copy import deepcopy
from k5test import realm

logging.basicConfig(level=logging.DEBUG)

saved_env = deepcopy(os.environ)

def _discover_path(name, default, paths):
    if name in paths:
        return paths[name]
    path = shutil.which(name)
    if path is not None:
        return path
    else:
        return default
realm._discover_path = _discover_path

realm.HeimdalRealm.hostname = None
realm.HeimdalRealm.hostname = "localhost"

def start_kadmind(self, env=None):
    if self._kadmind_proc:
        raise Exception("kadmind already started")
    if env is None:
        env = self.env
    config_file = f"--config-file={env['KRB5_CONFIG']}"
    port = "--ports=%s" % (self.portbase + 1)
    args = [self.kadmind, config_file, port]
    self._kadmind_proc = self._start_daemon(args)
realm.HeimdalRealm.start_kadmind = start_kadmind

def prep_kadmin(self, princname=None, pw=None, flags=None):
    if princname is None:
        princname = self.admin_princ
        pw = self.password("admin")
    return self.kinit(
        princname,
        pw,
        flags=["-S", "kadmin/admin", "-c", self.kadmin_ccache] + (flags or [])
    )
realm.HeimdalRealm.prep_kadmin = prep_kadmin

realm_paths = {}
for name in ("kdc", "kadmin", "kadmind", "kinit", "klist", "ktutil"):
    path = os.environ.get("K5TEST_HEIMDAL_" + name.upper().replace(".", "_"))
    if path is not None:
        realm_paths[name] = path

realm = realm.HeimdalRealm(start_kadmind=True, **realm_paths)
realm.http_princ = f"HTTP/testserver@{realm.realm}"
realm.http_keytab = os.path.join(realm.tmpdir, "http_keytab")
realm.addprinc(realm.http_princ)
realm.extract_keytab(realm.http_princ, realm.http_keytab)

for k, v in realm.env.items():
    os.environ[k] = v
"#
);

const RESTORE_ENV: &CStr = c_str!(
    r#"
import os
from copy import deepcopy

def restore_env(saved_env):
    for k in deepcopy(os.environ):
        if k in saved_env:
            os.environ[k] = saved_env[k]
        else:
            del os.environ[k]
"#
);

pub(crate) struct K5Test {
    realm: Py<PyAny>,
    saved_env: Py<PyAny>,
}

impl K5Test {
    #[allow(dead_code)]
    pub(crate) fn new(variant: KAdm5Variant) -> Result<Self> {
        let (realm, saved_env) = Python::attach(|py| {
            let module = match variant {
                #[cfg(mit_client)]
                KAdm5Variant::MitClient => {
                    PyModule::from_code(py, K5REALM_MIT_INIT, c_str!(""), c_str!(""))?
                }
                #[cfg(mit_server)]
                KAdm5Variant::MitServer => {
                    PyModule::from_code(py, K5REALM_MIT_INIT, c_str!(""), c_str!(""))?
                }
                #[cfg(heimdal_client)]
                KAdm5Variant::HeimdalClient => {
                    PyModule::from_code(py, K5REALM_HEIMDAL_INIT, c_str!(""), c_str!(""))?
                }
                #[cfg(heimdal_server)]
                KAdm5Variant::HeimdalServer => {
                    PyModule::from_code(py, K5REALM_HEIMDAL_INIT, c_str!(""), c_str!(""))?
                }
            };
            let realm = module.getattr("realm")?;
            let saved_env = module.getattr("saved_env")?;
            Ok::<(Py<PyAny>, Py<PyAny>), PyErr>((realm.into(), saved_env.into()))
        })?;

        Ok(Self { realm, saved_env })
    }

    #[allow(dead_code)]
    pub(crate) fn realm_name(&self) -> Result<String> {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            let realm_name: String = realm.getattr("realm")?.extract()?;
            Ok(realm_name)
        })
    }

    #[allow(dead_code)]
    pub(crate) fn tmpdir(&self) -> Result<String> {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            let tmpdir: String = realm.getattr("tmpdir")?.extract()?;
            Ok(tmpdir)
        })
    }

    #[allow(dead_code)]
    pub(crate) fn user_princ(&self) -> Result<String> {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            let user_princ: String = realm.getattr("user_princ")?.extract()?;
            Ok(user_princ)
        })
    }

    #[allow(dead_code)]
    pub(crate) fn admin_princ(&self) -> Result<String> {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            let admin_princ: String = realm.getattr("admin_princ")?.extract()?;
            Ok(admin_princ)
        })
    }

    #[allow(dead_code)]
    pub(crate) fn kadmin_ccache(&self) -> Result<String> {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            let kadmin_ccache: String = realm.getattr("kadmin_ccache")?.extract()?;
            Ok(kadmin_ccache)
        })
    }

    #[allow(dead_code)]
    pub(crate) fn password(&self, name: &str) -> Result<String> {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            let password: String = realm.call_method1("password", (name,))?.extract()?;
            Ok(password)
        })
    }

    #[allow(dead_code)]
    pub(crate) fn kinit(&self, name: &str, password: &str) -> Result<()> {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            realm.call_method1("kinit", (name, password))?;
            Ok(())
        })
    }

    #[allow(dead_code)]
    pub(crate) fn prep_kadmin(&self) -> Result<()> {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            realm.call_method0("prep_kadmin")?;
            Ok(())
        })
    }
}

impl Drop for K5Test {
    fn drop(&mut self) {
        Python::attach(|py| {
            let realm = self.realm.bind(py);
            let saved_env = self.saved_env.bind(py);

            realm.call_method0("stop")?;

            let module = PyModule::from_code(py, RESTORE_ENV, c_str!(""), c_str!(""))?;
            let restore_env = module.getattr("restore_env")?;
            restore_env.call1((saved_env,))?;

            Ok::<(), PyErr>(())
        })
        .unwrap();
        Command::new("pkill").arg("krb5kdc").output().unwrap();
        Command::new("pkill").arg("kdc").output().unwrap();
        Command::new("pkill").arg("kadmind").output().unwrap();
    }
}