compio-py-dynamic-openssl 0.3.0

Dynamic OpenSSL implementation for compio-py
Documentation
// SPDX-License-Identifier: Apache-2.0 OR MulanPSL-2.0
// Copyright 2026 Fantix King

use std::{
    io::{Read, Write},
    net::IpAddr,
};

pub use self::loader::{get, is_loaded, load};
use self::{
    loader::Error,
    ssl::{HandshakeError, Ssl, SslStream},
    sys as ffi,
};
use pyo3::{
    ffi::{PyObject, c_str},
    prelude::*,
    types::PyDict,
};

mod bio;
pub mod error;
mod loader;
pub mod ssl;
mod sys;

pub struct SSLContext {
    ptr: *mut ffi::SSL_CTX,
    pyobj: Py<PyAny>,
}

impl TryFrom<&Bound<'_, PyAny>> for SSLContext {
    type Error = PyErr;

    fn try_from(obj: &Bound<PyAny>) -> PyResult<Self> {
        #[repr(C)]
        struct PySSLContext {
            ob_base: PyObject,
            ctx: *mut ffi::SSL_CTX,
        }

        unsafe {
            let ptr = obj.as_ptr() as *const PySSLContext;
            let ptr = (*ptr).ctx;
            if ptr.is_null() {
                return Err(pyo3::exceptions::PyValueError::new_err(
                    "SSLContext has null SSL_CTX",
                ));
            }
            Ok(Self {
                ptr,
                pyobj: obj.clone().unbind(),
            })
        }
    }
}

impl SSLContext {
    pub fn connect<S>(&self, domain: &str, stream: S) -> Result<SslStream<S>, HandshakeError<S>>
    where
        S: Read + Write,
    {
        let mut ssl = Ssl::new(self.ptr)?;
        if domain.parse::<IpAddr>().is_err() {
            ssl.set_hostname(domain)?;
        }
        ssl.connect(stream)
    }

    pub fn accept<S>(&self, stream: S) -> Result<SslStream<S>, HandshakeError<S>>
    where
        S: Read + Write,
    {
        let ssl = Ssl::new(self.ptr)?;
        ssl.accept(stream)
    }
}

impl Clone for SSLContext {
    fn clone(&self) -> Self {
        Python::attach(|py| Self {
            ptr: self.ptr,
            pyobj: self.pyobj.clone_ref(py),
        })
    }
}

pub fn load_py(py: Python) -> PyResult<bool> {
    if is_loaded() {
        return Ok(true);
    }

    let find_lib = c_str!(
        r#"
import inspect, ssl
try:
    lib = inspect.getfile(ssl.SSLSession)
except TypeError:
    import os, sysconfig
    lib = os.path.join(sysconfig.get_config_var("LIBDIR"), sysconfig.get_config_var("LDLIBRARY"))
"#
    );
    let locals = PyDict::new(py);
    py.run(find_lib, None, Some(&locals))?;
    let lib = locals
        .get_item("lib")?
        .expect("defined lib")
        .extract::<String>()?;
    match load(lib) {
        Ok(()) => Ok(true),
        Err(Error::AlreadyLoaded) => Ok(true),
        _ => Ok(false),
    }
}