forge-guardrails 0.1.2

Foundation types for an LLM-agent workflow framework
Documentation
use crate::clients::base::ToolCall;
use serde_json::Value;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, LazyLock, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};

const FORGE_CLASSIFIER_LOG_ENV: &str = "FORGE_CLASSIFIER_LOG";
const FORGE_CLASSIFIER_LOG_QUEUE_CAPACITY_ENV: &str = "FORGE_CLASSIFIER_LOG_QUEUE_CAPACITY";
const FORGE_CLASSIFIER_LOG_MAX_EVENT_BYTES_ENV: &str = "FORGE_CLASSIFIER_LOG_MAX_EVENT_BYTES";
const FORGE_CLASSIFIER_LOG_REDACT_ENV: &str = "FORGE_CLASSIFIER_LOG_REDACT";

const DEFAULT_QUEUE_CAPACITY: usize = 256;
const DEFAULT_MAX_EVENT_BYTES: usize = 256 * 1024;
const REDACTED: &str = "[redacted]";

static CLASSIFIER_LOG_SINK: LazyLock<Mutex<Option<Arc<ClassifierLogSink>>>> =
    LazyLock::new(|| Mutex::new(None));
static CLASSIFIER_LOG_INIT_WARNED: AtomicBool = AtomicBool::new(false);

#[derive(Debug, Clone)]
pub(super) struct ClassifierLogConfig {
    path: PathBuf,
    queue_capacity: usize,
    max_event_bytes: usize,
    redact: bool,
}

pub(super) struct ClassifierLogSink {
    sender: tokio::sync::mpsc::Sender<ClassifierLogCommand>,
    dropped_events: AtomicU64,
    redact: bool,
}

enum ClassifierLogCommand {
    Event(Value),
    #[cfg(test)]
    Flush(tokio::sync::oneshot::Sender<()>),
}

impl ClassifierLogConfig {
    pub(super) fn new(path: PathBuf) -> Self {
        Self {
            path,
            queue_capacity: DEFAULT_QUEUE_CAPACITY,
            max_event_bytes: DEFAULT_MAX_EVENT_BYTES,
            redact: false,
        }
    }

    pub(super) fn with_queue_capacity(mut self, queue_capacity: usize) -> Self {
        self.queue_capacity = queue_capacity.max(1);
        self
    }

    pub(super) fn with_max_event_bytes(mut self, max_event_bytes: usize) -> Self {
        self.max_event_bytes = max_event_bytes;
        self
    }

    pub(super) fn with_redaction(mut self, redact: bool) -> Self {
        self.redact = redact;
        self
    }
}

impl ClassifierLogSink {
    pub(super) fn spawn(config: ClassifierLogConfig) -> Result<Arc<Self>, String> {
        tokio::runtime::Handle::try_current()
            .map_err(|err| format!("classifier log sink requires a Tokio runtime: {err}"))?;
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(&config.path)
            .map_err(|err| format!("failed to open {}: {err}", config.path.display()))?;
        let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_capacity.max(1));
        let sink = Arc::new(Self {
            sender,
            dropped_events: AtomicU64::new(0),
            redact: config.redact,
        });
        spawn_writer_task(file, receiver, config.max_event_bytes, config.path);
        Ok(sink)
    }

    pub(super) fn emit(&self, event: Value) {
        let event = if self.redact {
            redact_event(event)
        } else {
            event
        };
        match self.sender.try_send(ClassifierLogCommand::Event(event)) {
            Ok(()) => {}
            Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
                self.record_drop("queue_full");
            }
            Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
                self.record_drop("writer_closed");
            }
        }
    }

    #[cfg(test)]
    pub(super) async fn flush(&self) -> Result<(), String> {
        let (sender, receiver) = tokio::sync::oneshot::channel();
        self.sender
            .send(ClassifierLogCommand::Flush(sender))
            .await
            .map_err(|_| "classifier log writer closed".to_string())?;
        receiver
            .await
            .map_err(|_| "classifier log flush failed".to_string())
    }

    fn record_drop(&self, reason: &'static str) {
        let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
        if dropped == 1 || dropped.is_power_of_two() {
            tracing::warn!(
                target: "forge.classifier",
                reason,
                dropped,
                "dropped proxy classifier JSONL event"
            );
        }
    }
}

pub(super) fn init_proxy_classifier_log_sink_from_env() {
    let _ = classifier_log_sink_from_env();
}

pub(super) fn shutdown_proxy_classifier_log_sink() {
    match CLASSIFIER_LOG_SINK.lock() {
        Ok(mut guard) => {
            *guard = None;
        }
        Err(err) => {
            warn_classifier_log_init_once(format!("classifier log lock poisoned: {err}"));
        }
    }
}

pub(super) fn emit_proxy_classifier_jsonl(event: Value) {
    let Some(sink) = classifier_log_sink_from_env() else {
        return;
    };
    sink.emit(event);
}

fn classifier_log_sink_from_env() -> Option<Arc<ClassifierLogSink>> {
    let mut guard = match CLASSIFIER_LOG_SINK.lock() {
        Ok(guard) => guard,
        Err(err) => {
            warn_classifier_log_init_once(format!("classifier log lock poisoned: {err}"));
            return None;
        }
    };
    if let Some(sink) = guard.as_ref() {
        return Some(sink.clone());
    }

    let config = config_from_env()?;
    match ClassifierLogSink::spawn(config) {
        Ok(sink) => {
            *guard = Some(sink.clone());
            Some(sink)
        }
        Err(err) => {
            warn_classifier_log_init_once(err);
            None
        }
    }
}

fn config_from_env() -> Option<ClassifierLogConfig> {
    let path = std::env::var_os(FORGE_CLASSIFIER_LOG_ENV)?;
    if path.as_os_str().is_empty() {
        return None;
    }
    Some(
        ClassifierLogConfig::new(PathBuf::from(path))
            .with_queue_capacity(usize_env_or_default(
                FORGE_CLASSIFIER_LOG_QUEUE_CAPACITY_ENV,
                DEFAULT_QUEUE_CAPACITY,
            ))
            .with_max_event_bytes(usize_env_or_default(
                FORGE_CLASSIFIER_LOG_MAX_EVENT_BYTES_ENV,
                DEFAULT_MAX_EVENT_BYTES,
            ))
            .with_redaction(bool_env_or_default(FORGE_CLASSIFIER_LOG_REDACT_ENV, false)),
    )
}

fn spawn_writer_task(
    file: File,
    receiver: tokio::sync::mpsc::Receiver<ClassifierLogCommand>,
    max_event_bytes: usize,
    path: PathBuf,
) {
    tokio::task::spawn_blocking(move || writer_loop(file, receiver, max_event_bytes, path));
}

fn writer_loop(
    mut file: File,
    mut receiver: tokio::sync::mpsc::Receiver<ClassifierLogCommand>,
    max_event_bytes: usize,
    path: PathBuf,
) {
    while let Some(command) = receiver.blocking_recv() {
        match command {
            ClassifierLogCommand::Event(event) => {
                let line = match serde_json::to_string(&event) {
                    Ok(line) => line,
                    Err(err) => {
                        tracing::warn!(
                            target: "forge.classifier",
                            error = %err,
                            "failed to serialize proxy classifier JSONL event"
                        );
                        continue;
                    }
                };
                if line.len() > max_event_bytes {
                    tracing::warn!(
                        target: "forge.classifier",
                        event_bytes = line.len(),
                        max_event_bytes,
                        "dropped oversized proxy classifier JSONL event"
                    );
                    continue;
                }
                if let Err(err) = writeln!(file, "{line}") {
                    tracing::warn!(
                        target: "forge.classifier",
                        error = %err,
                        path = %path.display(),
                        "failed to write proxy classifier JSONL event"
                    );
                }
            }
            #[cfg(test)]
            ClassifierLogCommand::Flush(done) => {
                if let Err(err) = file.flush() {
                    tracing::warn!(
                        target: "forge.classifier",
                        error = %err,
                        path = %path.display(),
                        "failed to flush proxy classifier JSONL"
                    );
                }
                let _ = done.send(());
            }
        }
    }
    let _ = file.flush();
}

fn redact_event(mut event: Value) -> Value {
    let Some(obj) = event.as_object_mut() else {
        return event;
    };
    for key in [
        "user_request",
        "initial_user_request",
        "workflow_state",
        "required_facts",
        "candidate_final_response",
    ] {
        if obj.contains_key(key) {
            obj.insert(key.to_string(), Value::String(REDACTED.to_string()));
        }
    }

    if let Some(Value::Object(candidate)) = obj.get_mut("candidate_call") {
        if candidate.contains_key("arguments") {
            candidate.insert("arguments".to_string(), Value::String(REDACTED.to_string()));
        }
    }

    if let Some(Value::Array(results)) = obj.get_mut("tool_results") {
        for result in results {
            if let Some(result) = result.as_object_mut() {
                if result.contains_key("content") {
                    result.insert("content".to_string(), Value::String(REDACTED.to_string()));
                }
            }
        }
    }

    event
}

fn usize_env_or_default(name: &str, default: usize) -> usize {
    match std::env::var(name) {
        Ok(raw) if raw.trim().is_empty() => default,
        Ok(raw) => match raw.parse::<usize>() {
            Ok(value) => value,
            Err(err) => {
                tracing::warn!(
                    target: "forge.classifier",
                    name,
                    value = raw,
                    error = %err,
                    "invalid classifier log numeric environment value; using default"
                );
                default
            }
        },
        Err(std::env::VarError::NotPresent) => default,
        Err(err) => {
            tracing::warn!(
                target: "forge.classifier",
                name,
                error = %err,
                "failed to read classifier log environment value; using default"
            );
            default
        }
    }
}

fn bool_env_or_default(name: &str, default: bool) -> bool {
    match std::env::var(name) {
        Ok(raw) if raw.trim().is_empty() => default,
        Ok(raw) => match raw.trim().to_ascii_lowercase().as_str() {
            "1" | "true" | "yes" | "on" => true,
            "0" | "false" | "no" | "off" => false,
            _ => {
                tracing::warn!(
                    target: "forge.classifier",
                    name,
                    value = raw,
                    "invalid classifier log boolean environment value; using default"
                );
                default
            }
        },
        Err(std::env::VarError::NotPresent) => default,
        Err(err) => {
            tracing::warn!(
                target: "forge.classifier",
                name,
                error = %err,
                "failed to read classifier log environment value; using default"
            );
            default
        }
    }
}

fn warn_classifier_log_init_once(error: String) {
    if !CLASSIFIER_LOG_INIT_WARNED.swap(true, Ordering::Relaxed) {
        tracing::warn!(
            target: "forge.classifier",
            error,
            "failed to initialize proxy classifier JSONL sink"
        );
    }
}

pub(super) fn unix_ms() -> u128 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|duration| duration.as_millis())
        .unwrap_or_default()
}

pub(super) fn proxy_tool_call_for_json(call: &ToolCall) -> Value {
    Value::Object(
        [
            ("name".to_string(), Value::String(call.tool.clone())),
            (
                "arguments".to_string(),
                Value::Object(
                    call.args
                        .iter()
                        .map(|(key, value)| (key.clone(), value.clone()))
                        .collect(),
                ),
            ),
        ]
        .into_iter()
        .collect(),
    )
}