tandem-server 0.6.5

HTTP server for Tandem engine APIs
use std::io::SeekFrom;
use std::path::{Path, PathBuf};

use anyhow::Context;
use serde_json::Value;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

pub async fn write_file_atomically(
    path: &Path,
    content: &[u8],
    store_label: &str,
) -> anyhow::Result<()> {
    if let Some(parent) = path.parent() {
        tokio::fs::create_dir_all(parent).await.with_context(|| {
            format!(
                "failed to create {store_label} directory {}",
                parent.display()
            )
        })?;
    }

    let tmp_path = tmp_path_for(path);
    let mut file = tokio::fs::File::create(&tmp_path)
        .await
        .with_context(|| format!("failed to create {store_label} {}", tmp_path.display()))?;
    file.write_all(content)
        .await
        .with_context(|| format!("failed to write {store_label} {}", tmp_path.display()))?;
    file.flush()
        .await
        .with_context(|| format!("failed to flush {store_label} {}", tmp_path.display()))?;
    file.sync_all()
        .await
        .with_context(|| format!("failed to sync {store_label} {}", tmp_path.display()))?;
    drop(file);

    tokio::fs::rename(&tmp_path, path)
        .await
        .with_context(|| format!("failed to publish {store_label} {}", path.display()))?;
    sync_parent_dir(path, store_label).await?;
    Ok(())
}

pub async fn repair_jsonl_torn_tail(path: &Path, store_label: &str) -> anyhow::Result<()> {
    let metadata = match tokio::fs::metadata(path).await {
        Ok(metadata) => metadata,
        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
        Err(error) => {
            return Err(error)
                .with_context(|| format!("failed to inspect {store_label} {}", path.display()))
        }
    };
    if metadata.len() == 0 {
        return Ok(());
    }

    let mut file = tokio::fs::OpenOptions::new()
        .read(true)
        .write(true)
        .open(path)
        .await
        .with_context(|| format!("failed to open {store_label} {}", path.display()))?;

    file.seek(SeekFrom::End(-1))
        .await
        .with_context(|| format!("failed to seek {store_label} {}", path.display()))?;
    let mut last_byte = [0_u8; 1];
    file.read_exact(&mut last_byte)
        .await
        .with_context(|| format!("failed to read {store_label} {}", path.display()))?;
    if last_byte[0] == b'\n' {
        return Ok(());
    }

    file.seek(SeekFrom::Start(0))
        .await
        .with_context(|| format!("failed to rewind {store_label} {}", path.display()))?;
    let mut content = Vec::with_capacity(metadata.len() as usize);
    file.read_to_end(&mut content)
        .await
        .with_context(|| format!("failed to read {store_label} {}", path.display()))?;
    let repaired_len = content
        .iter()
        .rposition(|byte| *byte == b'\n')
        .map(|index| index + 1)
        .unwrap_or(0);
    let tail = &content[repaired_len..];
    if serde_json::from_slice::<Value>(tail).is_ok() {
        tracing::warn!(
            path = %path.display(),
            store = store_label,
            "repairing JSONL tail by appending missing newline"
        );
        file.seek(SeekFrom::End(0))
            .await
            .with_context(|| format!("failed to seek {store_label} {}", path.display()))?;
        file.write_all(b"\n").await.with_context(|| {
            format!(
                "failed to append missing newline to {store_label} {}",
                path.display()
            )
        })?;
        file.flush()
            .await
            .with_context(|| format!("failed to flush {store_label} {}", path.display()))?;
        file.sync_all()
            .await
            .with_context(|| format!("failed to sync repaired {store_label} {}", path.display()))?;
        drop(file);
        sync_parent_dir(path, store_label).await?;
        return Ok(());
    }

    let truncated_bytes = content.len().saturating_sub(repaired_len);

    tracing::warn!(
        path = %path.display(),
        store = store_label,
        truncated_bytes,
        "truncating torn JSONL tail before append"
    );
    file.set_len(repaired_len as u64)
        .await
        .with_context(|| format!("failed to truncate {store_label} {}", path.display()))?;
    file.sync_all()
        .await
        .with_context(|| format!("failed to sync repaired {store_label} {}", path.display()))?;
    drop(file);
    sync_parent_dir(path, store_label).await?;
    Ok(())
}

pub fn sideline_corrupt_state_file_sync(
    path: &Path,
    store_label: &str,
    parse_error: impl std::fmt::Display,
) -> anyhow::Error {
    let parse_error = parse_error.to_string();
    let corrupt_path = next_corrupt_path_for(path);
    match std::fs::rename(path, &corrupt_path) {
        Ok(()) => {
            tracing::warn!(
                path = %path.display(),
                corrupt_path = %corrupt_path.display(),
                store = store_label,
                error = %parse_error,
                "sidelined corrupt state store"
            );
            anyhow::anyhow!(
                "failed to parse {store_label} {}; corrupt store moved to {}: {parse_error}",
                path.display(),
                corrupt_path.display()
            )
        }
        Err(sideline_error) => anyhow::anyhow!(
            "failed to parse {store_label} {}; also failed to sideline corrupt store: {parse_error}; {sideline_error}",
            path.display()
        ),
    }
}

#[cfg(unix)]
pub async fn sync_parent_dir(path: &Path, store_label: &str) -> anyhow::Result<()> {
    let Some(parent) = path.parent() else {
        return Ok(());
    };
    let parent = parent.to_path_buf();
    let parent_for_sync = parent.clone();
    tokio::task::spawn_blocking(move || {
        std::fs::File::open(&parent_for_sync).and_then(|file| file.sync_all())
    })
    .await
    .with_context(|| format!("failed to join {store_label} directory sync task"))?
    .with_context(|| {
        format!(
            "failed to sync {store_label} directory {}",
            parent.display()
        )
    })?;
    Ok(())
}

#[cfg(not(unix))]
pub async fn sync_parent_dir(_path: &Path, _store_label: &str) -> anyhow::Result<()> {
    Ok(())
}

fn tmp_path_for(path: &Path) -> PathBuf {
    let mut tmp = path.to_path_buf();
    let extension = path
        .extension()
        .and_then(|value| value.to_str())
        .map(|value| format!("{value}.tmp"))
        .unwrap_or_else(|| "tmp".to_string());
    tmp.set_extension(extension);
    tmp
}

fn next_corrupt_path_for(path: &Path) -> PathBuf {
    for attempt in 0..1000 {
        let candidate = corrupt_path_for_attempt(path, attempt);
        if !candidate.exists() {
            return candidate;
        }
    }
    corrupt_path_for_attempt(path, 1000)
}

fn corrupt_path_for_attempt(path: &Path, attempt: usize) -> PathBuf {
    let mut corrupt = path.to_path_buf();
    let extension = path
        .extension()
        .and_then(|value| value.to_str())
        .map(|value| format!("{value}.corrupt"))
        .unwrap_or_else(|| "corrupt".to_string());
    let extension = if attempt == 0 {
        extension
    } else {
        format!("{extension}.{attempt}")
    };
    corrupt.set_extension(extension);
    corrupt
}