Skip to main content

algocline_app/service/
status.rs

1use std::path::Path;
2
3use algocline_engine::PendingFilter;
4
5use super::AppService;
6use crate::pool::{
7    client::PoolClient,
8    protocol::{PoolRequest, PoolResponseData},
9};
10
11impl AppService {
12    /// Snapshot of all active sessions (or one by ID) for external observation.
13    ///
14    /// # Arguments
15    ///
16    /// * `session_id` - When `Some`, returns detail for one session; when `None`, lists all.
17    /// * `pending_filter` - Optional preset name or custom field-filter for pending query projection.
18    /// * `include_history` - When `true`, each snapshot includes `conversation_history` (cap=10).
19    ///   Pass `false` (the default) for lightweight high-frequency polling snapshots.
20    ///
21    /// # Returns
22    ///
23    /// JSON string with either a single session object or `{active_sessions, sessions}` list.
24    ///
25    /// # Errors
26    ///
27    /// Returns `Err` when `pending_filter` is an unknown preset name or an invalid shape.
28    pub async fn status(
29        &self,
30        session_id: Option<&str>,
31        pending_filter: Option<serde_json::Value>,
32        include_history: bool,
33    ) -> Result<String, String> {
34        let filter = self.resolve_pending_filter(pending_filter)?;
35        let snapshots = self
36            .registry
37            .list_snapshots(filter.as_ref(), include_history)
38            .await;
39
40        // If a specific session requested, return just that one
41        if let Some(sid) = session_id {
42            if let Some(snapshot) = snapshots.get(sid) {
43                let mut result = snapshot.clone();
44                // Enrich with strategy name
45                if let Ok(strategies) = self.session_strategies.lock() {
46                    if let Some(name) = strategies.get(sid) {
47                        result["strategy"] = serde_json::json!(name);
48                    }
49                }
50                result["session_id"] = serde_json::json!(sid);
51                return serde_json::to_string_pretty(&result).map_err(|e| e.to_string());
52            }
53            // Pool fallback: host_mode=true sessions live in pool_registry,
54            // not SessionRegistry. Surface them as needs_response with a
55            // `pool: true` marker so callers can distinguish backends.
56            //
57            // When include_history=true, perform an IPC round-trip to the
58            // worker via PoolClient::Status{include_history:true} and inject
59            // the returned conversation_history. IPC failures surface as a
60            // `history_warning` field on the response (additive — see
61            // CLAUDE.md §Service 層 Error 伝播 規律) rather than dropping the
62            // status reply itself.
63            let pool_reg = self.pool_registry.read().await;
64            if let Some(entry) = pool_reg.find(sid) {
65                let mut result = serde_json::json!({
66                    "status": "needs_response",
67                    "session_id": sid,
68                    "pool": true,
69                    "pid": entry.pid,
70                    "sock": entry.sock.to_string_lossy(),
71                    "version": entry.version,
72                    "created_at": entry.created_at,
73                });
74                let sock_path = entry.sock.clone();
75                drop(pool_reg);
76                if include_history {
77                    match Self::fetch_pool_history(&sock_path).await {
78                        Ok(Some(history)) => {
79                            result["conversation_history"] = history;
80                        }
81                        Ok(None) => {}
82                        Err(e) => {
83                            result["history_warning"] = serde_json::json!(e);
84                        }
85                    }
86                }
87                return serde_json::to_string_pretty(&result).map_err(|e| e.to_string());
88            }
89            return Err(format!("session '{sid}' not found (may have completed)"));
90        }
91
92        // List all active sessions — merge SessionRegistry snapshots with
93        // pool_registry live entries. Pool entries are surfaced in the same
94        // shape used by the single-session fallback (see above), with a
95        // `pool: true` marker. SessionRegistry takes precedence on sid
96        // collision (defensive — host_mode design avoids collisions).
97        // include_history is ignored on the pool path; per-session
98        // conversation_history fetch over IPC is out of scope here.
99        let pool_reg = self.pool_registry.read().await;
100        if snapshots.is_empty() && pool_reg.sessions.is_empty() {
101            return Ok(serde_json::json!({
102                "active_sessions": 0,
103                "sessions": [],
104            })
105            .to_string());
106        }
107
108        let mut sessions: Vec<serde_json::Value> = {
109            let strategies = self.session_strategies.lock().ok();
110            snapshots
111                .iter()
112                .map(|(id, snapshot)| {
113                    let mut snap = snapshot.clone();
114                    if let Some(ref strats) = strategies {
115                        if let Some(name) = strats.get(id) {
116                            snap["strategy"] = serde_json::json!(name);
117                        }
118                    }
119                    snap["session_id"] = serde_json::json!(id);
120                    snap
121                })
122                .collect()
123        };
124
125        for entry in pool_reg.sessions.iter() {
126            if snapshots.contains_key(&entry.sid) {
127                continue;
128            }
129            sessions.push(serde_json::json!({
130                "status": "needs_response",
131                "session_id": entry.sid,
132                "pool": true,
133                "pid": entry.pid,
134                "sock": entry.sock.to_string_lossy(),
135                "version": entry.version,
136                "created_at": entry.created_at,
137            }));
138        }
139        drop(pool_reg);
140
141        let result = serde_json::json!({
142            "active_sessions": sessions.len(),
143            "sessions": sessions,
144        });
145
146        serde_json::to_string_pretty(&result).map_err(|e| e.to_string())
147    }
148
149    /// Open a one-shot UDS connection to the pool worker at `sock` and ask
150    /// it for the active session's conversation_history.
151    ///
152    /// # Returns
153    ///
154    /// - `Ok(Some(history))` — worker has an active session and returned a
155    ///   non-empty conversation_history JSON value.
156    /// - `Ok(None)` — worker is reachable but has no active session, or the
157    ///   session has no conversation_history yet.
158    /// - `Err(reason)` — IPC failure (connect / handshake / send / parse).
159    ///   Caller surfaces this as an additive `history_warning` field rather
160    ///   than dropping the status response (§Service 層 Error 伝播 規律).
161    async fn fetch_pool_history(sock: &Path) -> Result<Option<serde_json::Value>, String> {
162        let mut client = PoolClient::connect(sock)
163            .await
164            .map_err(|e| format!("pool connect failed: {e}"))?;
165        let resp = client
166            .send_request(PoolRequest::Status {
167                include_history: true,
168            })
169            .await
170            .map_err(|e| format!("pool status request failed: {e}"))?;
171        if !resp.ok {
172            return Err(resp
173                .error
174                .unwrap_or_else(|| "pool status error".to_string()));
175        }
176        match resp.data {
177            Some(PoolResponseData::Status {
178                conversation_history,
179                ..
180            }) => Ok(conversation_history),
181            other => Err(format!("unexpected pool status response: {other:?}")),
182        }
183    }
184
185    /// Decode the incoming `pending_filter` JSON value into an optional
186    /// `PendingFilter`. Preset strings read the per-request char count
187    /// from this service's `AppConfig`; custom objects use the values
188    /// declared by the caller.
189    fn resolve_pending_filter(
190        &self,
191        raw: Option<serde_json::Value>,
192    ) -> Result<Option<PendingFilter>, String> {
193        let Some(value) = raw else {
194            return Ok(None);
195        };
196        match value {
197            serde_json::Value::String(name) => PendingFilter::from_preset_with(
198                &name,
199                self.log_config.prompt_preview_chars,
200            )
201            .map(Some)
202            .ok_or_else(|| {
203                format!(
204                    "unknown pending_filter preset '{name}' (valid: \"meta\" | \"preview\" | \"full\")"
205                )
206            }),
207            serde_json::Value::Object(_) => serde_json::from_value::<PendingFilter>(value)
208                .map(Some)
209                .map_err(|e| format!("invalid pending_filter object: {e}")),
210            other => Err(format!(
211                "pending_filter must be a preset name (string) or filter object, got {}",
212                match other {
213                    serde_json::Value::Null => "null",
214                    serde_json::Value::Bool(_) => "bool",
215                    serde_json::Value::Number(_) => "number",
216                    serde_json::Value::Array(_) => "array",
217                    _ => "unknown",
218                }
219            )),
220        }
221    }
222}