basemind 0.6.0

Full AI context layer over MCP — tree-sitter code-map, document RAG (PDF/Office/HTML/email + OCR + reranker), shared agent memory, on-demand web crawl, git history + blame + per-symbol diff. 300+ languages, 8 coding-agent harnesses, content-addressed Fjall + LanceDB.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
//! In-process front-end + link over tokio mpsc channels.
//!
//! Used for same-process embedding (a future `basemind serve` that hosts the broker inline)
//! and for tests, which need an end-to-end client↔broker round-trip without a real socket.
//! The link skips framing entirely and moves owned [`CommsRequest`] / [`CommsOut`] values.

use std::sync::Arc;

use tokio::sync::{mpsc, watch};

use super::daemon::{Broker, Session};
use super::protocol::{CommsOut, CommsRequest};
use super::transport::{CommsFrontend, CommsLink, PeerCred};

/// Buffer depth for the per-link channels. Generous enough that a slow reader does not
/// back-pressure the broker on the common path; overflow drops the slowest sink (see the
/// broker's `fan_out`).
const CHANNEL_DEPTH: usize = 256;

/// One end of an in-process link, handed to a client.
pub struct InProcClientLink {
    to_broker: mpsc::Sender<CommsRequest>,
    from_broker: mpsc::Receiver<CommsOut>,
}

impl InProcClientLink {
    /// Send a request to the broker.
    pub async fn send_request(&self, req: CommsRequest) -> std::io::Result<()> {
        self.to_broker
            .send(req)
            .await
            .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "broker gone"))
    }

    /// Receive the next frame from the broker (response or notification).
    pub async fn recv(&mut self) -> Option<CommsOut> {
        self.from_broker.recv().await
    }
}

/// The broker-side half of an in-process link.
struct InProcLink {
    from_client: mpsc::Receiver<CommsRequest>,
    to_client: mpsc::Sender<CommsOut>,
}

impl CommsLink for InProcLink {
    async fn recv(&mut self) -> std::io::Result<Option<CommsRequest>> {
        Ok(self.from_client.recv().await)
    }

    async fn send(&mut self, out: CommsOut) -> std::io::Result<()> {
        self.to_client
            .send(out)
            .await
            .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "client gone"))
    }

    fn peer_cred(&self) -> PeerCred {
        // Same process — trusted by construction.
        PeerCred {
            uid: Some(current_uid()),
            pid: Some(std::process::id()),
        }
    }
}

/// In-process front-end. Hands out client links via [`InProcFrontend::connect`]; each spawns a
/// broker-side task driving the link.
pub struct InProcFrontend {
    broker: Arc<Broker>,
}

impl InProcFrontend {
    /// Build a front-end over an existing broker.
    pub fn new(broker: Arc<Broker>) -> Self {
        Self { broker }
    }

    /// Open a new in-process client link and spawn its broker-side serve task. The returned
    /// [`InProcClientLink`] is the client's handle.
    pub fn connect(&self) -> InProcClientLink {
        let (to_broker, from_client) = mpsc::channel(CHANNEL_DEPTH);
        let (to_client, from_broker) = mpsc::channel(CHANNEL_DEPTH);
        let link = InProcLink {
            from_client,
            to_client: to_client.clone(),
        };
        let broker = self.broker.clone();
        tokio::spawn(async move {
            serve_link(broker, link, to_client).await;
        });
        InProcClientLink {
            to_broker,
            from_broker,
        }
    }
}

impl CommsFrontend for InProcFrontend {
    async fn serve(
        self: Box<Self>,
        _broker: Arc<Broker>,
        mut shutdown: watch::Receiver<bool>,
    ) -> std::io::Result<()> {
        // The in-process front-end is driven by explicit `connect()` calls, so `serve` just
        // parks until shutdown — it exists to satisfy the trait for symmetry with the UDS
        // front-end.
        let _ = shutdown.changed().await;
        Ok(())
    }
}

/// Drive one link: read requests, dispatch through the broker, write responses.
/// `link_tx` is the notification sink the broker registers for `Subscribe`.
async fn serve_link(broker: Arc<Broker>, mut link: InProcLink, link_tx: mpsc::Sender<CommsOut>) {
    let mut session = Session::default();
    while let Ok(Some(req)) = link.recv().await {
        let resp = broker.handle(req, &mut session, &link_tx).await;
        if link.send(CommsOut::Response(resp)).await.is_err() {
            break;
        }
    }
}

fn current_uid() -> u32 {
    super::frontend_uds::daemon_uid()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::comms::ids::{AgentId, RoomId};
    use crate::comms::model::RoomScope;
    use crate::comms::protocol::{CommsRequest, CommsResponse, PROTO_VER};
    use crate::comms::store::CommsStore;

    async fn expect_response(link: &mut InProcClientLink) -> CommsResponse {
        loop {
            match link.recv().await.expect("frame") {
                CommsOut::Response(r) => return r,
                CommsOut::Notification(_) => continue,
            }
        }
    }

    #[tokio::test]
    async fn two_links_post_and_read_history_and_inbox() {
        let dir = tempfile::tempdir().expect("tempdir");
        let store = Arc::new(CommsStore::open(dir.path()).expect("store"));
        let broker = Arc::new(Broker::new(store));
        let frontend = InProcFrontend::new(broker.clone());

        let mut writer = frontend.connect();
        let mut reader = frontend.connect();

        // Both say hello (Global default room).
        for (link, name) in [(&mut writer, "writer"), (&mut reader, "reader")] {
            link.send_request(CommsRequest::Hello {
                agent: AgentId::parse(name).expect("agent"),
                proto_ver: PROTO_VER,
                remote: None,
                cwd: None,
            })
            .await
            .expect("hello");
            assert!(matches!(
                expect_response(link).await,
                CommsResponse::Welcome { .. }
            ));
        }

        // Create a shared global room and have the reader subscribe + join.
        let room = RoomId::parse("team").expect("room");
        writer
            .send_request(CommsRequest::CreateRoom {
                room: room.clone(),
                scope: RoomScope::Global,
                title: Some("Team".to_string()),
            })
            .await
            .expect("create");
        assert!(matches!(
            expect_response(&mut writer).await,
            CommsResponse::Room(_)
        ));

        reader
            .send_request(CommsRequest::Join { room: room.clone() })
            .await
            .expect("join");
        assert!(matches!(
            expect_response(&mut reader).await,
            CommsResponse::Ok
        ));

        // Writer posts.
        writer
            .send_request(CommsRequest::Post {
                room: room.clone(),
                subject: "status".to_string(),
                tags: vec!["daily".to_string()],
                reply_to: None,
                scope: vec![],
                body: b"all green".to_vec(),
            })
            .await
            .expect("post");
        let posted = expect_response(&mut writer).await;
        let message_id = match posted {
            CommsResponse::Posted { message_id } => message_id,
            other => panic!("expected Posted, got {other:?}"),
        };

        // Reader reads history → sees the front-matter (not the body).
        reader
            .send_request(CommsRequest::History {
                room: room.clone(),
                cursor: None,
                limit: Some(10),
            })
            .await
            .expect("history");
        match expect_response(&mut reader).await {
            CommsResponse::History { messages, .. } => {
                assert_eq!(messages.len(), 1);
                assert_eq!(messages[0].meta.subject, "status");
                assert_eq!(messages[0].meta.id, message_id);
                assert_eq!(messages[0].meta.body_len, "all green".len() as u32);
            }
            other => panic!("expected History, got {other:?}"),
        }

        // Reader fetches the body on demand.
        reader
            .send_request(CommsRequest::GetBody {
                message_id: message_id.clone(),
            })
            .await
            .expect("get_body");
        match expect_response(&mut reader).await {
            CommsResponse::Body { body } => {
                assert_eq!(body.as_deref(), Some(b"all green".as_ref()))
            }
            other => panic!("expected Body, got {other:?}"),
        }

        // Reader's inbox shows the unread message, then mark_read clears it.
        reader
            .send_request(CommsRequest::Inbox {
                remote: None,
                cwd: None,
                cursor: None,
                limit: Some(10),
                mark_read: true,
            })
            .await
            .expect("inbox");
        match expect_response(&mut reader).await {
            CommsResponse::Inbox { messages, .. } => {
                assert_eq!(
                    messages.len(),
                    1,
                    "the posted message is unread for the reader"
                );
                assert_eq!(messages[0].meta.subject, "status");
            }
            other => panic!("expected Inbox, got {other:?}"),
        }

        // Second inbox read after mark_read → empty.
        reader
            .send_request(CommsRequest::Inbox {
                remote: None,
                cwd: None,
                cursor: None,
                limit: Some(10),
                mark_read: false,
            })
            .await
            .expect("inbox");
        match expect_response(&mut reader).await {
            CommsResponse::Inbox { messages, .. } => {
                assert!(messages.is_empty(), "mark_read should clear the inbox");
            }
            other => panic!("expected Inbox, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn inbox_excludes_self_authored_but_history_keeps_them() {
        let dir = tempfile::tempdir().expect("tempdir");
        let store = Arc::new(CommsStore::open(dir.path()).expect("store"));
        let broker = Arc::new(Broker::new(store));
        let frontend = InProcFrontend::new(broker.clone());

        let mut writer = frontend.connect();
        let mut reader = frontend.connect();

        for (link, name) in [(&mut writer, "author"), (&mut reader, "other")] {
            link.send_request(CommsRequest::Hello {
                agent: AgentId::parse(name).expect("agent"),
                proto_ver: PROTO_VER,
                remote: None,
                cwd: None,
            })
            .await
            .expect("hello");
            assert!(matches!(
                expect_response(link).await,
                CommsResponse::Welcome { .. }
            ));
        }

        // A global room both agents auto-join (Global scope matches every chain).
        let room = RoomId::parse("team").expect("room");
        writer
            .send_request(CommsRequest::CreateRoom {
                room: room.clone(),
                scope: RoomScope::Global,
                title: Some("Team".to_string()),
            })
            .await
            .expect("create");
        assert!(matches!(
            expect_response(&mut writer).await,
            CommsResponse::Room(_)
        ));

        // The author posts to the room.
        writer
            .send_request(CommsRequest::Post {
                room: room.clone(),
                subject: "mine".to_string(),
                tags: vec![],
                reply_to: None,
                scope: vec![],
                body: b"self note".to_vec(),
            })
            .await
            .expect("post");
        let message_id = match expect_response(&mut writer).await {
            CommsResponse::Posted { message_id } => message_id,
            other => panic!("expected Posted, got {other:?}"),
        };

        // The author's OWN inbox excludes the message (auto-join subscribed them to the room).
        writer
            .send_request(CommsRequest::Inbox {
                remote: None,
                cwd: None,
                cursor: None,
                limit: Some(10),
                mark_read: false,
            })
            .await
            .expect("inbox");
        match expect_response(&mut writer).await {
            CommsResponse::Inbox {
                messages, unread, ..
            } => {
                assert!(
                    messages.is_empty(),
                    "an agent's own post must not appear in its inbox"
                );
                assert_eq!(
                    unread, 0,
                    "self-authored messages are not unread for the author"
                );
            }
            other => panic!("expected Inbox, got {other:?}"),
        }

        // But room_history still shows it for the author — the full log is not filtered.
        writer
            .send_request(CommsRequest::History {
                room: room.clone(),
                cursor: None,
                limit: Some(10),
            })
            .await
            .expect("history");
        match expect_response(&mut writer).await {
            CommsResponse::History { messages, .. } => {
                assert_eq!(messages.len(), 1, "history keeps self-authored messages");
                assert_eq!(messages[0].meta.id, message_id);
            }
            other => panic!("expected History, got {other:?}"),
        }

        // A different agent DOES see the message in their inbox.
        reader
            .send_request(CommsRequest::Inbox {
                remote: None,
                cwd: None,
                cursor: None,
                limit: Some(10),
                mark_read: false,
            })
            .await
            .expect("inbox");
        match expect_response(&mut reader).await {
            CommsResponse::Inbox { messages, .. } => {
                assert_eq!(messages.len(), 1, "a different agent sees the message");
                assert_eq!(messages[0].meta.subject, "mine");
            }
            other => panic!("expected Inbox, got {other:?}"),
        }
    }
}