algocline_app/service/status.rs
1use std::path::Path;
2
3use algocline_engine::PendingFilter;
4
5use super::AppService;
6use crate::pool::{
7 client::PoolClient,
8 protocol::{PoolRequest, PoolResponseData},
9};
10
11impl AppService {
12 /// Snapshot of all active sessions (or one by ID) for external observation.
13 ///
14 /// # Arguments
15 ///
16 /// * `session_id` - When `Some`, returns detail for one session; when `None`, lists all.
17 /// * `pending_filter` - Optional preset name or custom field-filter for pending query projection.
18 /// * `include_history` - When `true`, each snapshot includes `conversation_history` (cap=10).
19 /// Pass `false` (the default) for lightweight high-frequency polling snapshots.
20 ///
21 /// # Returns
22 ///
23 /// JSON string with either a single session object or `{active_sessions, sessions}` list.
24 ///
25 /// # Errors
26 ///
27 /// Returns `Err` when `pending_filter` is an unknown preset name or an invalid shape.
28 pub async fn status(
29 &self,
30 session_id: Option<&str>,
31 pending_filter: Option<serde_json::Value>,
32 include_history: bool,
33 ) -> Result<String, String> {
34 let filter = self.resolve_pending_filter(pending_filter)?;
35 let snapshots = self
36 .registry
37 .list_snapshots(filter.as_ref(), include_history)
38 .await;
39
40 // If a specific session requested, return just that one
41 if let Some(sid) = session_id {
42 if let Some(snapshot) = snapshots.get(sid) {
43 let mut result = snapshot.clone();
44 // Enrich with strategy name
45 if let Ok(strategies) = self.session_strategies.lock() {
46 if let Some(name) = strategies.get(sid) {
47 result["strategy"] = serde_json::json!(name);
48 }
49 }
50 result["session_id"] = serde_json::json!(sid);
51 return serde_json::to_string_pretty(&result).map_err(|e| e.to_string());
52 }
53 // Pool fallback: host_mode=true sessions live in pool_registry,
54 // not SessionRegistry. Surface them as needs_response with a
55 // `pool: true` marker so callers can distinguish backends.
56 //
57 // When include_history=true, perform an IPC round-trip to the
58 // worker via PoolClient::Status{include_history:true} and inject
59 // the returned conversation_history. IPC failures surface as a
60 // `history_warning` field on the response (additive — see
61 // CLAUDE.md §Service 層 Error 伝播 規律) rather than dropping the
62 // status reply itself.
63 let pool_reg = self.pool_registry.read().await;
64 if let Some(entry) = pool_reg.find(sid) {
65 let mut result = serde_json::json!({
66 "status": "needs_response",
67 "session_id": sid,
68 "pool": true,
69 "pid": entry.pid,
70 "sock": entry.sock.to_string_lossy(),
71 "version": entry.version,
72 "created_at": entry.created_at,
73 });
74 let sock_path = entry.sock.clone();
75 drop(pool_reg);
76 if include_history {
77 match Self::fetch_pool_history(&sock_path).await {
78 Ok(Some(history)) => {
79 result["conversation_history"] = history;
80 }
81 Ok(None) => {}
82 Err(e) => {
83 result["history_warning"] = serde_json::json!(e);
84 }
85 }
86 }
87 return serde_json::to_string_pretty(&result).map_err(|e| e.to_string());
88 }
89 return Err(format!("session '{sid}' not found (may have completed)"));
90 }
91
92 // List all active sessions — merge SessionRegistry snapshots with
93 // pool_registry live entries. Pool entries are surfaced in the same
94 // shape used by the single-session fallback (see above), with a
95 // `pool: true` marker. SessionRegistry takes precedence on sid
96 // collision (defensive — host_mode design avoids collisions).
97 // include_history is ignored on the pool path; per-session
98 // conversation_history fetch over IPC is out of scope here.
99 let pool_reg = self.pool_registry.read().await;
100 if snapshots.is_empty() && pool_reg.sessions.is_empty() {
101 return Ok(serde_json::json!({
102 "active_sessions": 0,
103 "sessions": [],
104 })
105 .to_string());
106 }
107
108 let mut sessions: Vec<serde_json::Value> = {
109 let strategies = self.session_strategies.lock().ok();
110 snapshots
111 .iter()
112 .map(|(id, snapshot)| {
113 let mut snap = snapshot.clone();
114 if let Some(ref strats) = strategies {
115 if let Some(name) = strats.get(id) {
116 snap["strategy"] = serde_json::json!(name);
117 }
118 }
119 snap["session_id"] = serde_json::json!(id);
120 snap
121 })
122 .collect()
123 };
124
125 for entry in pool_reg.sessions.iter() {
126 if snapshots.contains_key(&entry.sid) {
127 continue;
128 }
129 sessions.push(serde_json::json!({
130 "status": "needs_response",
131 "session_id": entry.sid,
132 "pool": true,
133 "pid": entry.pid,
134 "sock": entry.sock.to_string_lossy(),
135 "version": entry.version,
136 "created_at": entry.created_at,
137 }));
138 }
139 drop(pool_reg);
140
141 let result = serde_json::json!({
142 "active_sessions": sessions.len(),
143 "sessions": sessions,
144 });
145
146 serde_json::to_string_pretty(&result).map_err(|e| e.to_string())
147 }
148
149 /// Open a one-shot UDS connection to the pool worker at `sock` and ask
150 /// it for the active session's conversation_history.
151 ///
152 /// # Returns
153 ///
154 /// - `Ok(Some(history))` — worker has an active session and returned a
155 /// non-empty conversation_history JSON value.
156 /// - `Ok(None)` — worker is reachable but has no active session, or the
157 /// session has no conversation_history yet.
158 /// - `Err(reason)` — IPC failure (connect / handshake / send / parse).
159 /// Caller surfaces this as an additive `history_warning` field rather
160 /// than dropping the status response (§Service 層 Error 伝播 規律).
161 async fn fetch_pool_history(sock: &Path) -> Result<Option<serde_json::Value>, String> {
162 let mut client = PoolClient::connect(sock)
163 .await
164 .map_err(|e| format!("pool connect failed: {e}"))?;
165 let resp = client
166 .send_request(PoolRequest::Status {
167 include_history: true,
168 })
169 .await
170 .map_err(|e| format!("pool status request failed: {e}"))?;
171 if !resp.ok {
172 return Err(resp
173 .error
174 .unwrap_or_else(|| "pool status error".to_string()));
175 }
176 match resp.data {
177 Some(PoolResponseData::Status {
178 conversation_history,
179 ..
180 }) => Ok(conversation_history),
181 other => Err(format!("unexpected pool status response: {other:?}")),
182 }
183 }
184
185 /// Decode the incoming `pending_filter` JSON value into an optional
186 /// `PendingFilter`. Preset strings read the per-request char count
187 /// from this service's `AppConfig`; custom objects use the values
188 /// declared by the caller.
189 fn resolve_pending_filter(
190 &self,
191 raw: Option<serde_json::Value>,
192 ) -> Result<Option<PendingFilter>, String> {
193 let Some(value) = raw else {
194 return Ok(None);
195 };
196 match value {
197 serde_json::Value::String(name) => PendingFilter::from_preset_with(
198 &name,
199 self.log_config.prompt_preview_chars,
200 )
201 .map(Some)
202 .ok_or_else(|| {
203 format!(
204 "unknown pending_filter preset '{name}' (valid: \"meta\" | \"preview\" | \"full\")"
205 )
206 }),
207 serde_json::Value::Object(_) => serde_json::from_value::<PendingFilter>(value)
208 .map(Some)
209 .map_err(|e| format!("invalid pending_filter object: {e}")),
210 other => Err(format!(
211 "pending_filter must be a preset name (string) or filter object, got {}",
212 match other {
213 serde_json::Value::Null => "null",
214 serde_json::Value::Bool(_) => "bool",
215 serde_json::Value::Number(_) => "number",
216 serde_json::Value::Array(_) => "array",
217 _ => "unknown",
218 }
219 )),
220 }
221 }
222}