agcodex_cli/
proto.rs

1use std::io::IsTerminal;
2
3use agcodex_common::CliConfigOverrides;
4use agcodex_core::ConversationManager;
5use agcodex_core::NewConversation;
6use agcodex_core::config::Config;
7use agcodex_core::config::ConfigOverrides;
8use agcodex_core::protocol::Event;
9use agcodex_core::protocol::EventMsg;
10use agcodex_core::protocol::Submission;
11use clap::Parser;
12use tokio::io::AsyncBufReadExt;
13use tokio::io::BufReader;
14use tracing::error;
15use tracing::info;
16
17#[derive(Debug, Parser)]
18pub struct ProtoCli {
19    #[clap(skip)]
20    pub config_overrides: CliConfigOverrides,
21}
22
23pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
24    if std::io::stdin().is_terminal() {
25        anyhow::bail!("Protocol mode expects stdin to be a pipe, not a terminal");
26    }
27
28    tracing_subscriber::fmt()
29        .with_writer(std::io::stderr)
30        .init();
31
32    let ProtoCli { config_overrides } = opts;
33    let overrides_vec = config_overrides
34        .parse_overrides()
35        .map_err(anyhow::Error::msg)?;
36
37    let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
38    // Use conversation_manager API to start a conversation
39    let conversation_manager = ConversationManager::default();
40    let NewConversation {
41        conversation_id: _,
42        conversation,
43        session_configured,
44    } = conversation_manager.new_conversation(config).await?;
45
46    // Simulate streaming the session_configured event.
47    let synthetic_event = Event {
48        // Fake id value.
49        id: "".to_string(),
50        msg: EventMsg::SessionConfigured(session_configured),
51    };
52    let session_configured_event = match serde_json::to_string(&synthetic_event) {
53        Ok(s) => s,
54        Err(e) => {
55            error!("Failed to serialize session_configured: {e}");
56            return Err(anyhow::Error::from(e));
57        }
58    };
59    println!("{session_configured_event}");
60
61    // Task that reads JSON lines from stdin and forwards to Submission Queue
62    let sq_fut = {
63        let conversation = conversation.clone();
64        async move {
65            let stdin = BufReader::new(tokio::io::stdin());
66            let mut lines = stdin.lines();
67            loop {
68                let result = tokio::select! {
69                    _ = tokio::signal::ctrl_c() => {
70                        break
71                    },
72                    res = lines.next_line() => res,
73                };
74
75                match result {
76                    Ok(Some(line)) => {
77                        let line = line.trim();
78                        if line.is_empty() {
79                            continue;
80                        }
81                        match serde_json::from_str::<Submission>(line) {
82                            Ok(sub) => {
83                                if let Err(e) = conversation.submit_with_id(sub).await {
84                                    error!("{e:#}");
85                                    break;
86                                }
87                            }
88                            Err(e) => {
89                                error!("invalid submission: {e}");
90                            }
91                        }
92                    }
93                    _ => {
94                        info!("Submission queue closed");
95                        break;
96                    }
97                }
98            }
99        }
100    };
101
102    // Task that reads events from the agent and prints them as JSON lines to stdout
103    let eq_fut = async move {
104        loop {
105            let event = tokio::select! {
106                _ = tokio::signal::ctrl_c() => break,
107                event = conversation.next_event() => event,
108            };
109            match event {
110                Ok(event) => {
111                    let event_str = match serde_json::to_string(&event) {
112                        Ok(s) => s,
113                        Err(e) => {
114                            error!("Failed to serialize event: {e}");
115                            continue;
116                        }
117                    };
118                    println!("{event_str}");
119                }
120                Err(e) => {
121                    error!("{e:#}");
122                    break;
123                }
124            }
125        }
126        info!("Event queue closed");
127    };
128
129    tokio::join!(sq_fut, eq_fut);
130    Ok(())
131}