gpt-image-2-web 0.6.3

Self-hosted Docker/Web server for GPT Image 2.
#![allow(unused_imports)]

use super::*;

#[derive(Clone)]
pub(crate) struct StreamContext {
    pub(crate) state: JobQueueState,
    pub(crate) job_id: String,
    pub(crate) command: String,
    pub(crate) provider: String,
    pub(crate) created_at: String,
    pub(crate) metadata: Value,
}

#[derive(Debug, Clone)]
pub(crate) struct BatchItemError {
    pub(crate) index: usize,
    pub(crate) message: String,
}

#[derive(Debug, Clone)]
pub(crate) struct BatchRunResult {
    pub(crate) payloads: Vec<(usize, Value)>,
    pub(crate) errors: Vec<BatchItemError>,
}

pub(crate) fn run_payloads_concurrently_streaming(
    arg_sets: Vec<Vec<String>>,
    mut on_partial: impl FnMut(usize, &Value),
) -> BatchRunResult {
    let total = arg_sets.len();
    if total == 0 {
        return BatchRunResult {
            payloads: Vec::new(),
            errors: Vec::new(),
        };
    }
    let (tx, rx) = mpsc::channel::<(usize, Result<Value, String>)>();
    for (index, args) in arg_sets.into_iter().enumerate() {
        let tx = tx.clone();
        thread::spawn(move || {
            let result = cli_json_result(&args);
            let _ = tx.send((index, result));
        });
    }
    drop(tx);
    let mut results: Vec<Option<Value>> = (0..total).map(|_| None).collect();
    let mut errors = Vec::new();
    let mut received = 0usize;
    while received < total {
        match rx.recv() {
            Ok((index, Ok(payload))) => {
                on_partial(index, &payload);
                results[index] = Some(payload);
            }
            Ok((index, Err(error))) => errors.push(BatchItemError {
                index,
                message: error,
            }),
            Err(_) => break,
        }
        received += 1;
    }
    BatchRunResult {
        payloads: results
            .into_iter()
            .enumerate()
            .filter_map(|(index, payload)| payload.map(|payload| (index, payload)))
            .collect(),
        errors,
    }
}

pub(crate) fn apply_partial_output(
    ctx: &StreamContext,
    partials: &mut Vec<Value>,
    batch_index: usize,
    payload: &Value,
) {
    for id in collect_history_ids(payload) {
        if id != ctx.job_id {
            let _ = delete_history_job(&id);
        }
    }

    let files = output_files_from_payload(payload);
    for mut file in files {
        if let Value::Object(map) = &mut file {
            map.insert("index".to_string(), json!(batch_index));
        }
        partials.push(file);
    }

    let mut sorted_outputs = partials.clone();
    sorted_outputs.sort_by_key(|value| {
        value
            .get("index")
            .and_then(Value::as_u64)
            .unwrap_or(u64::MAX)
    });
    let first_path = sorted_outputs
        .iter()
        .find(|file| file.get("index").and_then(Value::as_u64) == Some(0))
        .and_then(|file| file.get("path"))
        .and_then(Value::as_str)
        .map(ToString::to_string);

    let parent_snapshot = job_snapshot(JobSnapshotInput {
        id: &ctx.job_id,
        command: &ctx.command,
        provider: &ctx.provider,
        status: "running",
        created_at: &ctx.created_at,
        metadata: ctx.metadata.clone(),
        output_path: first_path,
        outputs: json!(sorted_outputs),
        error: Value::Null,
    });
    let _ = persist_job(&parent_snapshot);

    let payload_path = payload
        .get("output")
        .and_then(|output| output.get("path"))
        .cloned()
        .unwrap_or(Value::Null);

    let Ok(mut inner) = ctx.state.inner.lock() else {
        return;
    };
    append_queue_event(
        &mut inner,
        &ctx.job_id,
        "local",
        "job.output_ready",
        json!({
            "index": batch_index,
            "path": payload_path,
            "job": parent_snapshot,
        }),
    );
}