1use std::io::IsTerminal;
2
3use agcodex_common::CliConfigOverrides;
4use agcodex_core::ConversationManager;
5use agcodex_core::NewConversation;
6use agcodex_core::config::Config;
7use agcodex_core::config::ConfigOverrides;
8use agcodex_core::protocol::Event;
9use agcodex_core::protocol::EventMsg;
10use agcodex_core::protocol::Submission;
11use clap::Parser;
12use tokio::io::AsyncBufReadExt;
13use tokio::io::BufReader;
14use tracing::error;
15use tracing::info;
16
17#[derive(Debug, Parser)]
18pub struct ProtoCli {
19 #[clap(skip)]
20 pub config_overrides: CliConfigOverrides,
21}
22
23pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
24 if std::io::stdin().is_terminal() {
25 anyhow::bail!("Protocol mode expects stdin to be a pipe, not a terminal");
26 }
27
28 tracing_subscriber::fmt()
29 .with_writer(std::io::stderr)
30 .init();
31
32 let ProtoCli { config_overrides } = opts;
33 let overrides_vec = config_overrides
34 .parse_overrides()
35 .map_err(anyhow::Error::msg)?;
36
37 let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
38 let conversation_manager = ConversationManager::default();
40 let NewConversation {
41 conversation_id: _,
42 conversation,
43 session_configured,
44 } = conversation_manager.new_conversation(config).await?;
45
46 let synthetic_event = Event {
48 id: "".to_string(),
50 msg: EventMsg::SessionConfigured(session_configured),
51 };
52 let session_configured_event = match serde_json::to_string(&synthetic_event) {
53 Ok(s) => s,
54 Err(e) => {
55 error!("Failed to serialize session_configured: {e}");
56 return Err(anyhow::Error::from(e));
57 }
58 };
59 println!("{session_configured_event}");
60
61 let sq_fut = {
63 let conversation = conversation.clone();
64 async move {
65 let stdin = BufReader::new(tokio::io::stdin());
66 let mut lines = stdin.lines();
67 loop {
68 let result = tokio::select! {
69 _ = tokio::signal::ctrl_c() => {
70 break
71 },
72 res = lines.next_line() => res,
73 };
74
75 match result {
76 Ok(Some(line)) => {
77 let line = line.trim();
78 if line.is_empty() {
79 continue;
80 }
81 match serde_json::from_str::<Submission>(line) {
82 Ok(sub) => {
83 if let Err(e) = conversation.submit_with_id(sub).await {
84 error!("{e:#}");
85 break;
86 }
87 }
88 Err(e) => {
89 error!("invalid submission: {e}");
90 }
91 }
92 }
93 _ => {
94 info!("Submission queue closed");
95 break;
96 }
97 }
98 }
99 }
100 };
101
102 let eq_fut = async move {
104 loop {
105 let event = tokio::select! {
106 _ = tokio::signal::ctrl_c() => break,
107 event = conversation.next_event() => event,
108 };
109 match event {
110 Ok(event) => {
111 let event_str = match serde_json::to_string(&event) {
112 Ok(s) => s,
113 Err(e) => {
114 error!("Failed to serialize event: {e}");
115 continue;
116 }
117 };
118 println!("{event_str}");
119 }
120 Err(e) => {
121 error!("{e:#}");
122 break;
123 }
124 }
125 }
126 info!("Event queue closed");
127 };
128
129 tokio::join!(sq_fut, eq_fut);
130 Ok(())
131}