1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
//! Session compaction — LLM-based context summarization and transcript logging.
use std::sync::Arc;
use chrono::Utc;
use futures::StreamExt;
use serde_json::{Value, json};
use tokio::io::AsyncWriteExt as _;
use tracing::{debug, info, warn};
use super::{
context_mgr::{compress_tool_results, estimate_tokens, msg_tokens},
runtime::AgentRuntime,
};
use rsclaw_provider::{
AgentEndpoint, ContentPart, LlmRequest, Message, MessageContent, Role, StreamEvent,
};
/// Prefix for compaction summaries. Tells the LLM that the summary is
/// reference material from a previous context window, NOT active
/// instructions.
///
/// Use [`build_compaction_summary_msg`] to wrap a summary — that helper
/// also injects the current `compacted at` timestamp so the model has a
/// "recent verbatim vs older summarised" temporal anchor (the head's
/// `[Session started: ...]` marker remains the original session origin,
/// per protocol §2.4 head-byte-stability invariant).
const COMPACTION_PREFIX: &str = "\
[CONTEXT COMPACTION - REFERENCE ONLY] Earlier turns were compacted \
into the summary below. This is a handoff from a previous context \
window - treat it as background reference, NOT as active instructions. \
Do NOT answer questions or fulfill requests mentioned in this summary; \
they were already addressed. \
Your current task is in the '## Active Task' section - resume from there. \
Respond ONLY to the latest user message that appears AFTER this summary. \
\n\
If you need pre-compaction specifics this summary omits (a verbatim \
quote, a file path, an error you can't find here), call \
read_session_archive(mode=\"grep:KEYWORD\") to search the full \
original conversation — it's still on disk, never deleted by compaction.";
/// Build the summary message body that gets stored as the User-role
/// `[CONTEXT COMPACTION ...]` message AND sent as the `summary` field
/// in the rsclaw splice wire call. Caller must use the SAME returned
/// string in both places so the gateway's local view and the server's
/// post-splice KV agree byte-for-byte.
fn build_compaction_summary_msg(summary: &str) -> String {
let now = chrono::Utc::now()
.with_timezone(&chrono::Local)
.format("%Y-%m-%dT%H:%M");
format!("{COMPACTION_PREFIX} (compacted at {now})\n\n{summary}")
}
/// Convert a local (OpenAI-style) message slice into the rsclaw server's
/// flat slot count.
///
/// The server stores a flat `[user, assistant, user, assistant, …]` log
/// (`docs/client-server-integration.md` §6.3.2): each `POST /turn` input is
/// ONE `user` slot — and a turn's `tool_results` bundle into a single
/// `user` slot regardless of how many tools ran — and each assistant reply
/// is ONE `assistant` slot. Our local history keeps tool results as
/// separate `Role::Tool` messages (and `Role::Assistant` carries its
/// `ToolUse` blocks inline), so `Vec<Message>::len()` is a DIFFERENT
/// granularity than the server's slot count and must not be sent as
/// `expected_msgs_count` / `keep_head_messages` / `keep_tail_messages`.
///
/// Mapping that mirrors the server's bundling:
/// - `Role::User` → 1 slot
/// - `Role::Assistant` → 1 slot
/// - a maximal contiguous run of `Role::Tool` → 1 slot (the bundled
/// `tool_results` turn)
/// - `Role::System` → 0 (it lives in the prefix, never a message slot)
///
/// Self-consistency: after a splice the local history is rebuilt as
/// `head + summary(1 user) + tail`, so `count_server_slots` of the new
/// history equals `keep_head + 1 + keep_tail`, which is exactly the
/// server's reported post-splice `msgs_count` — the basis stays aligned
/// across repeated compactions with no extra bookkeeping.
fn count_server_slots(msgs: &[Message]) -> usize {
let mut slots = 0usize;
let mut in_tool_run = false;
for m in msgs {
match m.role {
Role::User | Role::Assistant => {
slots += 1;
in_tool_run = false;
}
Role::Tool => {
if !in_tool_run {
slots += 1;
in_tool_run = true;
}
}
Role::System => {
in_tool_run = false;
}
}
}
slots
}
/// Returns true if a JSON message value represents a compaction summary
/// (internal only). Used by both the REST API (server/mod.rs) and WebSocket
/// chat handler to filter compaction messages from user-visible history.
pub fn is_compaction_message(v: &serde_json::Value) -> bool {
let obj = match v.as_object() {
Some(o) => o,
None => return false,
};
let role = obj.get("role").and_then(|r| r.as_str()).unwrap_or("");
if role != "user" {
return false;
}
// Content can be a plain string or an array of parts.
if let Some(text) = obj.get("content").and_then(|c| c.as_str()) {
return text.starts_with("[CONTEXT COMPACTION");
}
if let Some(parts) = obj.get("content").and_then(|c| c.as_array()) {
if let Some(first_text) = parts
.first()
.and_then(|p| p.get("text"))
.and_then(|t| t.as_str())
{
return first_text.starts_with("[CONTEXT COMPACTION");
}
}
false
}
impl AgentRuntime {
/// Summarise the session history via LLM when the total character count
/// approaches `reserveTokensFloor` (approximated as floor * 4 chars/token,
/// default 100 000 chars).
///
/// **Layered mode** (default): keeps the last N user-assistant pairs
/// verbatim and only summarises the older portion, so recent context is
/// never lost. Falls back to Default/Safeguard when configured.
pub(crate) async fn compact_if_needed(&mut self, session_key: &str, model: &str) {
self.compact_inner(session_key, model, false).await;
}
/// Force compaction regardless of threshold (used by /compact).
pub(crate) async fn compact_force(&mut self, session_key: &str, model: &str) {
self.compact_inner(session_key, model, true).await;
}
pub(crate) async fn compact_inner(&mut self, session_key: &str, model: &str, force: bool) {
use rsclaw_config::schema::CompactionMode;
// Use configured compaction settings, or sensible defaults.
let cfg = self
.live
.agents
.read()
.await
.defaults
.compaction
.clone()
.unwrap_or_default();
// Compaction trigger: token threshold against the FULL prompt size
// (system + tools + message history), not message history alone.
//
// Previously this compared `sum(msg_tokens)` against an 80%-of-context
// threshold — but msg_tokens excludes the ~24K of system prompt + tool
// definitions that every request also carries. On a 64K context with
// 6K system + 17.5K tools, the backend rejects at ~40K msg_tokens, yet
// the old check didn't fire until 51K msg_tokens — so the model 413'd
// before compaction ever ran. Compare against `total` instead.
// Per-agent context window wins (agent.model.contextTokens), then
// the global default, then 64K. handle.context_window already
// resolved that chain at construction (0 = unset → use default).
let context_tokens = if self.handle.context_window > 0 {
self.handle.context_window
} else {
self.live
.agents
.read()
.await
.defaults
.context_tokens
.unwrap_or(128_000) as usize
};
// Used later to pick the splice strategy (mode 2 = rsclaw stateful).
//
// Mirror the auto-force at runtime.rs (~4277): the actual turn LLM
// calls force kv_cache_mode=2 whenever the resolved provider is
// `rsclaw`, regardless of agents.defaults.kv_cache_mode (which is
// unset → 1 by default). Compaction MUST use the same effective
// mode, otherwise the `kv_cache_mode == 2` splice path below is
// skipped: the session gets rewritten locally but the server is
// never told, so the next turn's lookup sees msgs.len() drop,
// forces a /sessions/replay, opens a NEW server session_id, and
// cold-prefills the entire post-compact history (~50s/turn,
// observed as rs_w4_* session churn in the worker log). With the
// effective mode the splice preserves session_id and the cache.
let configured_kv_mode = self
.live
.agents
.read()
.await
.defaults
.kv_cache_mode
.unwrap_or(1);
let (resolved_provider, _) = self.providers.resolve_model(model);
let kv_cache_mode = if resolved_provider == "rsclaw" {
2
} else {
configured_kv_mode
};
// Headroom to leave below the context window for: the model's reply,
// the next user message, and token-estimation drift. `reserveTokensFloor`
// (if configured) is authoritative; otherwise reserve 5% + a fixed
// margin for the next inbound message.
const NEXT_MSG_MARGIN: usize = 4_000;
let reply_reserve = (context_tokens / 20).max(2_000);
let token_threshold = if let Some(floor) = cfg.reserve_tokens_floor {
context_tokens.saturating_sub(floor as usize).max(16_000)
} else {
context_tokens
.saturating_sub(reply_reserve)
.saturating_sub(NEXT_MSG_MARGIN)
.max(16_000)
};
// Prefer the server-measured prompt size (input_tokens from the last
// turn's usage, stored as `SessionTokens.total`) — it's exact, vs the
// chars/4 heuristic which drifts 10-30% on CJK / code. Falls back to
// the estimate (msg_tokens sum + fixed sys/tools overhead) when usage
// wasn't reported (e.g. first turn, or a provider that omits usage).
let real_total = self
.handle
.session_tokens
.read()
.ok()
.and_then(|m| m.get(session_key).map(|t| t.total))
.filter(|&t| t > 0);
let total_tokens: usize = real_total.unwrap_or_else(|| {
let msgs: usize = self
.sessions
.get(session_key)
.map(|msgs| msgs.iter().map(msg_tokens).sum())
.unwrap_or(0);
// Add the fixed system+tools overhead so the estimate is
// comparable to `total` (a full-prompt number).
let overhead = self.estimate_fixed_overhead();
msgs + overhead
});
let turns = self
.compaction_state
.get(session_key)
.map(|(_, t)| *t)
.unwrap_or(0);
let token_trigger = total_tokens > token_threshold;
debug!(
session = session_key,
total_tokens, token_threshold, turns, token_trigger, force, "compaction check"
);
if !force && !token_trigger {
self.compaction_state
.entry(session_key.to_owned())
.and_modify(|(_, t)| *t += 1)
.or_insert((std::time::Instant::now(), 1));
return;
}
let trigger_reason = if token_trigger { "tokens" } else { "time" };
info!(
session = session_key,
trigger = trigger_reason,
total_tokens,
turns,
"compaction triggered"
);
let mode = cfg
.mode
.as_ref()
.cloned()
.unwrap_or(CompactionMode::Layered);
let compaction_model = cfg.model.as_deref().unwrap_or(model);
// Dynamic keepRecentPairs: reduce when token pressure is high.
let configured_pairs = cfg.keep_recent_pairs.unwrap_or(5) as usize;
let keep_pairs = if total_tokens > token_threshold * 3 {
1.max(configured_pairs / 3) // extreme pressure: keep 1-2 pairs
} else if total_tokens > token_threshold * 2 {
1.max(configured_pairs / 2) // high pressure: keep 2-3 pairs
} else {
configured_pairs // normal: use configured value
};
let extract_facts = cfg.extract_facts.unwrap_or(true);
let msgs_to_text = |msgs: &[Message]| -> String {
let default_transcript = (context_tokens * 7 / 10).max(16_000);
let max_total_tokens: usize = cfg
.max_transcript_tokens
.map(|t| t as usize)
.unwrap_or(default_transcript);
Self::msgs_to_text_static(msgs, max_total_tokens)
};
// Split messages into (head, old_portion, recent_portion) for layered mode.
// Head: first user-assistant pair (contains [Session started:...], preserved
// verbatim) Middle: compressed into summary
// Tail: recent N pairs (preserved verbatim)
let (head_msgs, old_text, recent_msgs) = if mode == CompactionMode::Layered {
let msgs = self.sessions.get(session_key).cloned().unwrap_or_default();
// Head: protect first user + first assistant (2 messages).
//
// Iterative compaction note (relevant to rsclaw §2.4 splice's
// "head sanctuary" invariant):
//
// The session rebuild below uses `[head_msgs + summary +
// recent_msgs]` order, so after the first compact `msgs[0]`
// is STILL the original first user message — NOT the summary.
// Subsequent compacts therefore find the same head boundary
// (head_end = 2) and `keep_head_messages = 2` is stable
// across N compactions, preserving the server-side KV pages
// that hold the original `[Session started: ...]` marker.
//
// The `first_is_summary` branch below intentionally
// surrenders head sanctuary in the rare case where the
// session starts with a compaction summary as `msgs[0]` —
// i.e. AFTER a `/clear` (which sets sess to a single summary
// msg) when the conversation has since grown back over
// threshold. In that case the original session head is
// already lost (/clear discarded it) and `keep_head_messages
// = 0` is the honest wire value.
let head_end = {
let first_is_summary = msgs
.first()
.and_then(|m| {
if let MessageContent::Text(t) = &m.content {
Some(t.starts_with("[CONTEXT COMPACTION"))
} else {
None
}
})
.unwrap_or(false);
if first_is_summary {
0 // post-/clear path — head sanctuary already lost
} else {
// Protect first user + first assistant pair
let mut count = 0usize;
let mut end = 0;
for (idx, m) in msgs.iter().enumerate() {
if m.role == Role::Assistant {
count += 1;
end = idx + 1;
if count >= 1 {
break;
}
}
}
end.min(msgs.len())
}
};
// Tail: count user-assistant pairs from the end.
let mut pair_count = 0usize;
let mut split_idx = msgs.len();
let mut i = msgs.len();
while i > head_end && pair_count < keep_pairs {
i -= 1;
if msgs[i].role == Role::User {
pair_count += 1;
split_idx = i;
}
}
// Ensure split doesn't overlap with head.
split_idx = split_idx.max(head_end);
// Don't split inside a tool_call/tool_result group.
// If split_idx lands on tool results, move it back to include the
// preceding assistant(tool_calls) message so the pair stays together.
while split_idx > head_end
&& split_idx < msgs.len()
&& msgs[split_idx].role == Role::Tool
{
split_idx -= 1;
}
let head = msgs[..head_end].to_vec();
let mut old_portion = msgs[head_end..split_idx].to_vec();
let recent = msgs[split_idx..].to_vec();
if old_portion.is_empty() {
return; // not enough history to compact
}
compress_tool_results(&mut old_portion, 6);
(head, msgs_to_text(&old_portion), recent)
} else {
let mut msgs = self.sessions.get(session_key).cloned().unwrap_or_default();
compress_tool_results(&mut msgs, 6);
(vec![], msgs_to_text(&msgs), vec![])
};
// Pre-compaction entity preservation: deterministic extraction (phone/ID/email)
// plus LLM-based semantic extraction (name, birthday, zodiac, etc.)
// Both run BEFORE the LLM summary to guarantee no data loss.
{
let entities = crate::context_mgr::extract_key_entities(&old_text);
if !entities.is_empty() {
if let Some(ref mem) = self.memory {
let scope = format!("agent:{}", self.handle.id);
crate::context_mgr::write_entity_memories(mem, &scope, entities).await;
debug!(
session = session_key,
"pre-compaction deterministic entities pinned"
);
}
}
// LLM-based entity extraction is now handled by the summary prompt
// (Entities section), parsed after summary generation below.
}
// Detect previous compaction summary for iterative update.
let previous_summary = {
let msgs = self.sessions.get(session_key).cloned().unwrap_or_default();
msgs.iter().find_map(|m| {
if let MessageContent::Text(t) = &m.content {
if t.starts_with("[CONTEXT COMPACTION") {
let summary_start = t.find("\n\n").map(|i| i + 2).unwrap_or(0);
Some(t[summary_start..].to_owned())
} else {
None
}
} else {
None
}
})
};
// Summarise the old portion. Three mutually-exclusive paths
// keyed off the agent's `kv_cache_mode`:
//
// mode = 2 → rsclaw stateful incremental protocol. The
// summary request rides the SAME `/sessions/<id>/turn`
// as a normal turn — worker has prefix+history
// cached, delta is just the "summarize" instruction
// (~200 tokens). On failure we DO NOT fall back to
// `compact_single`: that path sends mode=0 with the
// full history as a fresh user message, which the
// rsclaw provider rejects up-front (so the fallback
// is also useless) AND if it somehow reached the
// worker, would force a 150-270s cold prefill of
// the entire 30-50k-token history — wedging a slot
// the whole time, at exactly the moment the worker
// is already stressed. Skip this round, retry next
// turn. Also ignore `CompactionMode::Safeguard`:
// its chunked-replay design fights the incremental
// session model.
//
// mode = 1 → legacy slot-cache. Worker keeps the prefix in
// its slot; the compact-with-kv-cache fast path
// appends a summary message and lets the slot
// cache amortize the prefix. On failure, falling
// back to `compact_single` is safe: same provider,
// same prefix bytes, slot still re-uses the cache.
// No worker-side protocol mismatch.
//
// mode = 0 / Safeguard
// → no KV cache assumption — always standalone
// `compact_single` (or chunked variant). Same as
// pre-incremental behavior.
// Keep the working plan visible to the summarizer even when the todo
// tool result has been sketched out of the transcript. The template's
// "## Plan" slot tells the model where to carry it. (The kv_cache_mode=2
// path injects inside compact_with_kv_cache — its transcript lives on
// the worker, not in old_text.)
let old_text = match self.load_todo_rendered(session_key) {
Some(plan) => {
format!("{old_text}\n\n[CURRENT PLAN — todo tool state at compaction]\n{plan}")
}
None => old_text,
};
let summary = if kv_cache_mode == 2 {
match self
.compact_with_kv_cache(
session_key,
compaction_model,
&old_text,
previous_summary.as_deref(),
)
.await
{
Some(s) => Some(s),
None => {
warn!(
session = session_key,
"rsclaw KV-cache compaction failed; skipping this round (no fallback \
to standalone — would force a 150-270s worker re-prefill of the \
entire history). Next turn will retry. Investigate the \
kv_cache_mode=2 turn in the rsclaw provider log."
);
None
}
}
} else if kv_cache_mode == 1 && mode != CompactionMode::Safeguard {
let result = self
.compact_with_kv_cache(
session_key,
compaction_model,
&old_text,
previous_summary.as_deref(),
)
.await;
if result.is_some() {
result
} else {
// Legacy slot-cache fallback is harmless: same provider,
// same prefix bytes, worker slot still amortizes.
info!(
session = session_key,
"KV cache compact failed, falling back to standalone"
);
self.compact_single(compaction_model, &old_text, previous_summary.as_deref())
.await
}
} else {
match mode {
CompactionMode::Default | CompactionMode::Layered => {
self.compact_single(compaction_model, &old_text, previous_summary.as_deref())
.await
}
CompactionMode::Safeguard => {
const CHUNK_SIZE: usize = 40_000;
let chunks: Vec<&str> = {
let mut result = Vec::new();
let mut remaining = old_text.as_str();
while !remaining.is_empty() {
let mut end = CHUNK_SIZE.min(remaining.len());
while end < remaining.len() && !remaining.is_char_boundary(end) {
end -= 1;
}
let (chunk, rest) = remaining.split_at(end);
result.push(chunk);
remaining = rest;
}
result
};
let mut combined = String::new();
for chunk in chunks {
match self.compact_single(compaction_model, chunk, None).await {
Some(s) => {
combined.push_str(&s);
combined.push('\n');
}
None => return,
}
}
if combined.is_empty() {
None
} else {
Some(combined)
}
}
}
};
let Some(summary) = summary else { return };
// -- Entity extraction from summary's Entities section --
// The summary prompt includes an Entities section (kind=value format).
// Parse it and write as pinned memories — no extra LLM call needed.
if let Some(ref mem) = self.memory {
let entities = parse_entities_from_summary(&summary);
if !entities.is_empty() {
let scope = format!("agent:{}", self.handle.id);
crate::context_mgr::write_entity_memories(mem, &scope, entities).await;
debug!(
session = session_key,
"entities extracted from compaction summary"
);
}
}
// -- Key fact extraction: store important facts in long-term memory --
if extract_facts {
if let Some(facts) = self.extract_key_facts(compaction_model, &old_text).await {
if let Some(ref mem) = self.memory {
let scope = format!("agent:{}", self.handle.id);
let mut guard = mem.lock().await;
for fact in facts.lines().filter(|l| !l.trim().is_empty()) {
let fact_text = fact.trim_start_matches("- ").trim();
if fact_text.len() > 5 {
let doc = crate::memory::MemoryDoc {
id: format!("cf-{}", uuid::Uuid::new_v4()),
scope: scope.clone(),
kind: "compaction_fact".to_owned(),
text: fact_text.to_owned(),
vector: vec![],
created_at: 0, // filled by add()
accessed_at: 0,
access_count: 0,
importance: 0.7, // higher than default
tier: Default::default(),
abstract_text: None,
overview_text: None,
tags: vec![],
pinned: false,
};
if let Err(e) = guard.add(doc).await {
tracing::warn!("compaction fact memory add failed: {e:#}");
}
}
}
drop(guard);
debug!(
session = session_key,
"key facts extracted to long-term memory"
);
}
}
}
// Build the single summary message text used in BOTH the local
// session rebuild AND the rsclaw splice wire call. Same bytes
// both places — guarantees the gateway's local view matches
// whatever the server prefills into the spliced KV slot.
let summary_body = build_compaction_summary_msg(&summary);
// -- rsclaw splice (protocol §2.4) ------------------------------
// Only attempt when running under kv_cache_mode=2 + a registered
// rsclaw provider. The trait's default `compact_splice` returns
// Err for non-rsclaw providers, so we could call unconditionally,
// but the provider lookup keeps the log noise (and the wire-call
// attempt) off non-rsclaw paths entirely.
//
// On success: server has spliced KV in place; session_id is
// preserved; provider has updated its cached SessionEntry's
// last_seen_msgs_len to the post-splice value, so the next
// turn's `lookup_and_bump` will NOT misinterpret the upcoming
// local msgs.len() drop as a history-trim signal.
//
// On failure: log + fall through. The lazy fallback is the
// existing replay path — after we rewrite self.sessions[key]
// below, the next turn's `lookup_and_bump` sees msgs.len() <
// last_seen_msgs_len (unchanged from pre-splice) and returns
// None, forcing a /sessions/replay against the new history.
// No need for an active replay() call here — compaction has no
// LlmRequest to construct one from anyway, and the natural
// replay path is already battle-tested.
if kv_cache_mode == 2 {
let (resolved_provider, _) = self.providers.resolve_model(model);
if let Ok(provider) = self.providers.get(resolved_provider) {
// keep_head/keep_tail/expected MUST be in the server's
// flat-slot unit, not local Vec<Message> length — see
// count_server_slots. head is the first user+assistant
// (2 slots, or 0 post-/clear); tail is the kept recent
// window collapsed to server slots.
let head_count = count_server_slots(&head_msgs);
let tail_count = count_server_slots(&recent_msgs);
// Honest Option: when there's no cached session locally
// we send None instead of misleading Some(0) — the
// server would 409 against 0 and the log would
// erroneously claim "drift" instead of "we didn't have
// a baseline". Server treats `None` as "skip the
// optimistic check".
let expected: Option<usize> = self
.sessions
.get(session_key)
.map(|m| count_server_slots(m));
match provider
.compact_splice(session_key, head_count, &summary_body, tail_count, expected)
.await
{
Ok(new_msgs_count) => {
info!(
session = session_key,
head = head_count,
tail = tail_count,
msgs_after = new_msgs_count,
"rsclaw compact: spliced in place, session_id preserved"
);
}
Err(e) => {
warn!(
session = session_key,
error = %e,
"rsclaw compact: splice failed, next turn will replay against \
the post-compact history"
);
}
}
}
}
// Replace session history: head + summary + tail.
// Head: first user-assistant pair (contains [Session started:...])
// Summary: wrapped with compaction prefix
// Tail: recent messages kept verbatim
if let Some(sess) = self.sessions.get_mut(session_key) {
let summary_msg = Message {
role: Role::User,
content: MessageContent::Text(summary_body),
rsclaw_hidden: None,
};
sess.clear();
// Head: preserved verbatim (first user + first assistant).
sess.extend(head_msgs);
// Summary: compacted middle portion.
sess.push(summary_msg);
// Tail: recent messages kept verbatim.
sess.extend(recent_msgs);
}
// Reset compaction state after successful compaction.
self.compaction_state
.insert(session_key.to_owned(), (std::time::Instant::now(), 0));
// Compaction is a natural refresh point: pick up any skills installed
// (skill_install) since the last load. Cache-safe — only re-prefills
// the per-session user_system layer, and only if the set changed.
self.reload_skills();
// Persist compacted session to redb (survives restarts).
if let Some(sess) = self.sessions.get(session_key) {
if let Err(e) = self.store.db.delete_session(session_key) {
tracing::warn!("compaction: failed to delete old session: {e:#}");
}
for msg in sess.iter() {
let val = serde_json::to_value(msg).unwrap_or_default();
if let Err(e) = self.store.db.append_message(session_key, &val) {
tracing::warn!("compaction: failed to persist message: {e:#}");
}
}
}
// Note: plugins/skills no longer cached at runtime level — they
// are rebuilt every turn inside `build_user_system`, so no
// explicit invalidation is needed here.
let new_tokens: usize = self
.sessions
.get(session_key)
.map(|msgs| msgs.iter().map(msg_tokens).sum())
.unwrap_or(0);
info!(
session = session_key,
tokens_before = total_tokens,
tokens_after = new_tokens,
keep_pairs,
"auto-compaction complete (layered)"
);
// If compaction barely helped (still >80% of threshold), inject a
// system hint so the agent will relay the /new suggestion to the user.
if new_tokens > token_threshold * 4 / 5 {
let zh = rsclaw_i18n::default_lang() == "zh";
let hint = if zh {
"[system] 上下文压缩后仍然较大,响应可能变慢。请告知用户发送 /new 开启新会话以恢复正常速度。"
} else {
"[system] Context is still large after compaction and responses may slow down. Please tell the user to send /new to start a fresh session."
};
if let Some(sess) = self.sessions.get_mut(session_key) {
sess.push(Message {
role: Role::System,
content: MessageContent::Text(hint.to_owned()),
rsclaw_hidden: None,
});
}
warn!(
session = session_key,
tokens_after = new_tokens,
threshold = token_threshold,
"compaction insufficient, /new recommended"
);
}
// Persist compaction marker to transcript.
self.append_transcript(
session_key,
"[auto-compaction triggered]",
&format!("[summary: {summary}]"),
)
.await;
}
/// Render messages as plain text transcript with two-pass budget
/// allocation.
///
/// Total output is capped at `max_total_tokens` to avoid blowing up the
/// compact LLM's context window. Recent messages get full detail first;
/// older messages get progressively reduced detail until budget is
/// exhausted.
pub(crate) fn msgs_to_text_static(msgs: &[Message], max_total_tokens: usize) -> String {
// Helper: truncate to N chars (UTF-8 safe).
fn trunc(s: &str, max: usize) -> String {
match s.char_indices().nth(max) {
None => s.to_owned(),
Some((byte_idx, _)) => {
let mut t = s[..byte_idx].to_owned();
t.push_str("...[truncated]");
t
}
}
}
// Helper: smart-truncate tool_call args.
fn compact_args(input: &Value) -> String {
const BULK_FIELDS: &[&str] = &["content", "old_string", "new_string"];
const MAX_BULK: usize = 300;
const MAX_CMD: usize = 500;
const MAX_TOTAL: usize = 2000;
if let Some(obj) = input.as_object() {
let needs = obj.iter().any(|(k, v)| {
let limit = if BULK_FIELDS.contains(&k.as_str()) {
MAX_BULK
} else if k == "command" {
MAX_CMD
} else {
return false;
};
v.as_str()
.map(|s| s.char_indices().nth(limit).is_some())
.unwrap_or(false)
});
if needs {
let mut compact = serde_json::Map::new();
for (k, v) in obj {
let limit = if BULK_FIELDS.contains(&k.as_str()) {
Some(MAX_BULK)
} else if k == "command" {
Some(MAX_CMD)
} else {
None
};
if let (Some(lim), Some(s)) = (limit, v.as_str()) {
compact.insert(k.clone(), Value::String(trunc(s, lim)));
} else {
compact.insert(k.clone(), v.clone());
}
}
let ser = serde_json::to_string(&Value::Object(compact)).unwrap_or_default();
return if ser.char_indices().nth(MAX_TOTAL).is_some() {
trunc(&ser, MAX_TOTAL)
} else {
ser
};
}
}
let full = serde_json::to_string(input).unwrap_or_default();
if full.char_indices().nth(MAX_TOTAL).is_some() {
trunc(&full, MAX_TOTAL)
} else {
full
}
}
// Render a single message at the given detail level:
// 2 = full (tool args + results), 1 = medium, 0 = minimal
let render_msg = |m: &Message, detail: u8| -> String {
let role = format!("{:?}", m.role).to_lowercase();
let body = match &m.content {
MessageContent::Text(t) => {
if detail == 0 {
trunc(t, 200)
} else {
t.clone()
}
}
MessageContent::Parts(parts) => parts
.iter()
.filter_map(|p| match p {
ContentPart::Text { text } => Some(if detail == 0 {
trunc(text, 200)
} else {
text.clone()
}),
ContentPart::ToolUse { name, input, .. } => match detail {
2 => Some(format!("[tool_call: {name}({})]", compact_args(input))),
1 => Some(format!(
"[tool_call: {name}({})]",
trunc(&serde_json::to_string(input).unwrap_or_default(), 100)
)),
_ => Some(format!("[tool_call: {name}]")),
},
ContentPart::ToolResult {
tool_use_id: _,
content,
..
} => match detail {
2 => Some(format!("[tool_result: {}]", trunc(content, 800))),
1 => Some(format!("[tool_result: {}]", trunc(content, 150))),
_ => None,
},
ContentPart::Image { .. } => Some("[image]".to_owned()),
#[allow(unreachable_patterns)]
_ => None,
})
.collect::<Vec<_>>()
.join(" "),
};
format!("{role}: {body}")
};
// Pass 1: full detail, check if within budget.
let full: Vec<String> = msgs.iter().map(|m| render_msg(m, 2)).collect();
let full_tokens: Vec<usize> = full.iter().map(|s| estimate_tokens(s)).collect();
let total: usize = full_tokens.iter().sum();
if total <= max_total_tokens {
return full.join("\n");
}
// Pass 2: allocate budget from newest to oldest.
let n = msgs.len();
let mut detail_levels = vec![0u8; n];
let mut budget_used = 0usize;
for i in (0..n).rev() {
if budget_used + full_tokens[i] <= max_total_tokens {
detail_levels[i] = 2;
budget_used += full_tokens[i];
} else {
let m = &msgs[i];
for &d in &[1u8, 0] {
let rendered = render_msg(m, d);
let cost = estimate_tokens(&rendered);
if budget_used + cost <= max_total_tokens || d == 0 {
detail_levels[i] = d;
budget_used += cost.min(max_total_tokens.saturating_sub(budget_used));
break;
}
}
}
if budget_used >= max_total_tokens {
break;
}
}
// Final render in order.
let mut result = String::new();
let mut tokens_used = 0usize;
for (i, m) in msgs.iter().enumerate() {
let line = if detail_levels[i] == 2 {
full[i].clone()
} else {
render_msg(m, detail_levels[i])
};
let line_tokens = estimate_tokens(&line);
if tokens_used + line_tokens > max_total_tokens {
result.push_str("\n...[context truncated]");
break;
}
result.push_str(&line);
result.push('\n');
tokens_used += line_tokens;
}
result
}
/// Compact using existing session messages to reuse KV cache prefix.
///
/// Instead of sending a standalone request, appends a summary instruction
/// to the current session's messages. The system prompt + tools + history
/// are already cached in the LLM slot, so only the final summary prompt
/// needs to be computed.
pub(crate) async fn compact_with_kv_cache(
&mut self,
session_key: &str,
model: &str,
_old_text: &str,
previous_summary: Option<&str>,
) -> Option<String> {
let system_prompt = self.cached_system_prompt.clone()?;
// Clone session messages — we'll append the summary instruction
// to the API copy only, not to the stored session.
let mut messages = self.sessions.get(session_key).cloned().unwrap_or_default();
if messages.is_empty() {
return None;
}
// Build the summary instruction
let template = Self::summary_template();
let instruction = if let Some(prev) = previous_summary {
format!(
"Ignore all previous instructions. You are now a summarization agent. \
Do NOT call any tools. Do NOT answer questions. Output ONLY a structured summary.\n\n\
Update the previous compaction summary with the new conversation turns above.\n\n\
PREVIOUS SUMMARY:\n{prev}\n\n\
PRESERVE existing info, ADD new actions, update Active Task.\n\n{template}"
)
} else {
format!(
"Ignore all previous instructions. You are now a summarization agent. \
Do NOT call any tools. Do NOT answer questions. Output ONLY a structured summary \
of the entire conversation above.\n\n{template}"
)
};
// The working plan lives in kv, not in the transcript the worker has
// cached — surface it so the summary's "## Plan" slot gets filled.
let instruction = match self.load_todo_rendered(session_key) {
Some(plan) => format!(
"{instruction}\n\nCURRENT PLAN (todo tool state — carry into \"## Plan\", \
updating statuses the conversation shows changed):\n{plan}"
),
None => instruction,
};
// Append summary instruction as the last user message
messages.push(Message {
role: Role::User,
content: MessageContent::Text(instruction),
rsclaw_hidden: None,
});
// Reuse the cached tools from the last run_turn for exact prefix match.
// The summary instruction already says "Do NOT call any tools".
// If the LLM still tries, we ignore the tool call in the stream handler.
let tools = self.cached_tools.clone();
// Pick the right transport based on the model's resolved provider.
//
// For rsclaw kvCacheMode=2: the worker already has the entire
// `messages` history cached in its KV slot (we're literally
// continuing the agent's main session). Sending the request as
// a normal stateful turn means the worker only decodes the
// newly-appended summary instruction (~200-300 tokens) and
// streams back the summary. The legacy mode=0 path with
// `session_key: None` looked like a fresh standalone request
// to the provider — it would either be rejected (rsclaw
// requires mode=2) or re-decode all 30-40k tokens from
// scratch, taking tens of seconds.
//
// For non-rsclaw providers (anthropic, openai, ollama, ...):
// mode=0 is the legacy slot-cache-aware path that those
// providers already understand. Keep that behavior so the
// change is rsclaw-only and doesn't disturb other backends.
let (resolved_provider, _) = self.providers.resolve_model(model);
let (kv_cache_mode, session_key_opt) = if resolved_provider == "rsclaw" {
(2, Some(session_key.to_owned()))
} else {
(0, None)
};
let req = LlmRequest {
fallback_models: Vec::new(),
model: model.to_owned(),
messages,
tools,
system: Some(system_prompt),
max_tokens: Some(4096),
temperature: None,
frequency_penalty: None,
thinking_budget: None,
endpoint: Default::default(),
kv_cache_mode,
session_key: session_key_opt,
system_shared: None,
user_system: None,
recall: None,
};
let providers = Arc::clone(&self.providers);
let mut stream = match self.failover.call(req, &providers).await {
Ok(s) => s,
Err(e) => {
warn!("KV cache compact LLM call failed: {e:#}");
return None;
}
};
let mut summary = String::new();
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(d)) => summary.push_str(&d),
Ok(StreamEvent::ReasoningDelta(_)) => {}
Ok(StreamEvent::Done { .. }) | Ok(StreamEvent::Error(_)) => break,
Ok(StreamEvent::ToolCall { .. }) => {
// LLM tried to call a tool despite empty tools list — skip
warn!("compact_with_kv_cache: unexpected tool call, ignoring");
}
Err(e) => {
warn!("KV cache compact stream error: {e:#}");
return None;
}
}
}
if summary.is_empty() {
None
} else {
info!(
"compact_with_kv_cache: summary generated ({} chars)",
summary.len()
);
Some(summary)
}
}
/// Shared summary template used by both standalone and KV cache modes.
fn summary_template() -> &'static str {
"Use this exact structure:\n\n\
## Active Task\n\
[THE MOST IMPORTANT FIELD. Copy the user's most recent unfulfilled request \
verbatim. If no outstanding task, write \"None.\"]\n\n\
## Goal\n[Overall goal]\n\n\
## Constraints & Preferences\n\
[User-stated constraints or preferences mentioned anywhere in this session: \
response language (Chinese / English), tooling choice (npm vs pnpm, Rust \
edition, package manager), style rules, deadlines, banned approaches \
(\"no force-push\", \"keep backward compatibility\"), required-platform \
compatibility (Windows, Linux). If none: (none)]\n\n\
## Completed\n[Numbered list: N. ACTION target - outcome]\n\n\
## Active State\n[Modified files, test status, running processes, branch]\n\n\
## In Progress\n[Work underway when compaction fired]\n\n\
## Plan\n[The current working-plan checklist if one was provided — copy the \
[x]/[>]/[ ] lines verbatim, updating statuses the conversation shows changed. \
If none: (none)]\n\n\
## Key Data\n[Exact values verbatim: file paths, URLs, IDs, phone numbers]\n\n\
## Decisions\n[Technical decisions and WHY]\n\n\
## Pending\n[Blocked items or awaiting user]\n\n\
## Resolved Questions\n[Already answered — include the answer]\n\n\
## Files\n[Files read/modified/created]\n\n\
## Entities\n[kind=value per line. Kinds: name, phone, id_card, email, birthday, \
age, zodiac, address, relationship, preference. If none: (none)]\n\n\
CRITICAL: Copy ALL values character-for-character. Be CONCRETE."
}
/// Call the LLM once with a summarization prompt and return the text.
///
/// Supports iterative updates: if `previous_summary` is provided (from a
/// prior compaction), the LLM updates it with new turns instead of
/// starting from scratch. This preserves information across multiple
/// compactions.
pub(crate) async fn compact_single(
&mut self,
model: &str,
history: &str,
previous_summary: Option<&str>,
) -> Option<String> {
let preamble = "You are a summarization agent creating a context checkpoint. \
Your output will be injected as reference for a DIFFERENT assistant that \
continues the conversation. Do NOT respond to any questions or requests \
in the conversation - only output the structured summary. \
Do NOT include any preamble, greeting, or prefix.";
let template = Self::summary_template();
let prompt = if let Some(prev) = previous_summary {
format!(
"{preamble}\n\n\
You are updating a context compaction summary. A previous compaction \
produced the summary below. New conversation turns have occurred \
since then and need to be incorporated.\n\n\
PREVIOUS SUMMARY:\n{prev}\n\n\
NEW TURNS TO INCORPORATE:\n{history}\n\n\
Update the summary using this exact structure. PRESERVE all existing \
information that is still relevant. ADD new completed actions \
(continue numbering). Move items from \"In Progress\" to \"Completed\" \
when done. Update \"Active State\" to reflect current state. \
Remove information only if clearly obsolete. \
CRITICAL: Update \"## Active Task\" to the user's most recent \
unfulfilled request.\n\n{template}"
)
} else {
format!(
"{preamble}\n\n\
Create a structured handoff summary. The next assistant should \
understand what happened without re-reading the original turns.\n\n\
TURNS TO SUMMARIZE:\n{history}\n\n\
Use this exact structure:\n\n{template}"
)
};
let req = LlmRequest {
fallback_models: Vec::new(),
model: model.to_owned(),
messages: vec![Message {
role: Role::User,
content: MessageContent::Text(prompt),
rsclaw_hidden: None,
}],
tools: vec![], // no tools — compact must only produce text
system: None, // preamble is in the user message
max_tokens: Some(4096),
temperature: None,
frequency_penalty: None,
thinking_budget: None,
endpoint: AgentEndpoint::Flash,
kv_cache_mode: 0,
session_key: None,
system_shared: None,
user_system: None,
recall: None,
};
let providers = Arc::clone(&self.providers);
let mut stream = match self.failover.call(req, &providers).await {
Ok(s) => s,
Err(e) => {
warn!("compaction LLM call failed: {e:#}");
return None;
}
};
let mut summary = String::new();
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(d)) => summary.push_str(&d),
Ok(StreamEvent::ReasoningDelta(_)) => {} // ignore reasoning in compaction
Ok(StreamEvent::Done { .. }) | Ok(StreamEvent::Error(_)) => break,
Ok(StreamEvent::ToolCall { .. }) => {} // unexpected in summarization
Err(e) => {
warn!("compaction stream error: {e:#}");
return None;
}
}
}
if summary.is_empty() {
None
} else {
Some(summary)
}
}
/// Extract key facts (names, IDs, decisions, file paths) from a
/// conversation transcript for long-term memory storage.
pub(crate) async fn extract_key_facts(&mut self, model: &str, history: &str) -> Option<String> {
// Limit input to avoid huge summarisation calls.
let input = if history.len() > 60_000 {
let mut end = 60_000;
while end < history.len() && !history.is_char_boundary(end) {
end += 1;
}
&history[..end]
} else {
history
};
let req = LlmRequest {
fallback_models: Vec::new(),
model: model.to_owned(),
messages: vec![Message {
role: Role::User,
content: MessageContent::Text(format!(
"Extract the key facts from this conversation that should be remembered \
long-term. Output ONLY a bullet list (one fact per line, prefixed with \
'- '). Include: names, user IDs, chat IDs, phone numbers, account numbers, \
any numeric sequences that were looked up or confirmed, important decisions, \
file paths, URLs, preferences, and action items. \
IMPORTANT: copy numeric values (phone numbers, IDs) character-for-character — \
never truncate or paraphrase them. Be concise. Skip ephemeral chit-chat.\n\n{input}"
)),
rsclaw_hidden: None,
}],
tools: vec![],
system: Some(
"You extract key facts from conversations. Output only a bullet list.".to_owned(),
),
max_tokens: Some(1024),
temperature: None,
frequency_penalty: None,
thinking_budget: None,
endpoint: AgentEndpoint::Flash,
kv_cache_mode: 0,
session_key: None,
system_shared: None,
user_system: None,
recall: None,
};
let providers = Arc::clone(&self.providers);
let mut stream = match self.failover.call(req, &providers).await {
Ok(s) => s,
Err(e) => {
warn!("key fact extraction failed: {e:#}");
return None;
}
};
let mut result = String::new();
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(d)) => result.push_str(&d),
Ok(StreamEvent::Done { .. }) | Ok(StreamEvent::Error(_)) => break,
_ => {}
}
}
if result.is_empty() {
None
} else {
Some(result)
}
}
// -----------------------------------------------------------------------
// JSONL transcript (AGENTS.md $20 step 11)
// -----------------------------------------------------------------------
/// Append user + assistant messages to
/// `<base_dir>/transcripts/<key>.jsonl`. Routes through
/// `config::loader::base_dir()` so non-default profiles
/// (`--dev`, `--profile`) write transcripts under the matching base dir.
pub(crate) async fn append_transcript(
&self,
session_key: &str,
user_text: &str,
assistant_text: &str,
) {
let transcripts_dir = rsclaw_config::loader::base_dir().join("transcripts");
// Sanitize session key for use as a filename.
let safe_key: String = session_key
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' {
c
} else {
'_'
}
})
.collect();
let path = transcripts_dir.join(format!("{safe_key}.jsonl"));
if let Err(e) = tokio::fs::create_dir_all(&transcripts_dir).await {
warn!("transcript mkdir: {e:#}");
return;
}
let ts = Utc::now().to_rfc3339();
let mut lines = String::new();
for (role, content) in [("user", user_text), ("assistant", assistant_text)] {
let entry = json!({
"role": role,
"content": content,
"session": session_key,
"agent": self.handle.id,
"ts": ts,
});
if let Ok(s) = serde_json::to_string(&entry) {
lines.push_str(&s);
lines.push('\n');
}
}
match tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
{
Ok(mut f) => {
if let Err(e) = f.write_all(lines.as_bytes()).await {
warn!("transcript write: {e:#}");
}
}
Err(e) => warn!("transcript open: {e:#}"),
}
}
}
/// Parse `## Entities` section from a compaction summary.
///
/// Expected format (one per line):
/// ```text
/// ## Entities
/// name=小王
/// phone=18674030927
/// birthday=1995年3月15日
/// ```
fn parse_entities_from_summary(summary: &str) -> Vec<crate::context_mgr::KeyEntity> {
let mut entities = Vec::new();
// Find the Entities section
let section_start = summary.find("## Entities");
let Some(start) = section_start else {
return entities;
};
let content = &summary[start..];
// Take lines until next ## section or end of string
let section_end = content[3..]
.find("\n## ")
.map(|i| i + 3)
.unwrap_or(content.len());
let section = &content[..section_end];
let kind_to_label: &[(&str, &str, &'static str)] = &[
("name", "用户姓名", "name"),
("phone", "用户手机号", "phone_number"),
("id_card", "用户身份证", "id_card"),
("email", "用户邮箱", "email"),
("birthday", "用户生日", "birthday"),
("age", "用户年龄", "age"),
("zodiac", "用户星座", "zodiac"),
("lucky_number", "用户幸运数字", "lucky_number"),
("address", "用户地址", "address"),
("relationship", "用户关系", "relationship"),
("preference", "用户偏好", "preference"),
];
for line in section.lines().skip(1) {
let line = line.trim();
if line.is_empty() || line == "(none)" || line.starts_with("##") {
continue;
}
// Parse kind=value
if let Some((kind, value)) = line.split_once('=') {
let kind = kind.trim().to_lowercase();
let value = value.trim();
if value.is_empty() {
continue;
}
if let Some((_, label, static_kind)) = kind_to_label.iter().find(|(k, _, _)| *k == kind)
{
entities.push(crate::context_mgr::KeyEntity {
kind: static_kind,
value: value.to_owned(),
memory_text: format!("{label}: {value}"),
});
}
}
}
entities
}
#[cfg(test)]
mod tests {
use super::*;
fn msg(role: Role, text: &str) -> Message {
Message {
role,
content: MessageContent::Text(text.to_owned()),
rsclaw_hidden: None,
}
}
#[test]
fn count_server_slots_plain_turns() {
// [user, assistant, user, assistant] → 4 server slots, one per
// /turn input + one per assistant reply.
let msgs = vec![
msg(Role::User, "hi"),
msg(Role::Assistant, "hello"),
msg(Role::User, "bye"),
msg(Role::Assistant, "later"),
];
assert_eq!(count_server_slots(&msgs), 4);
}
#[test]
fn count_server_slots_bundles_tool_run() {
// One tool round: user, assistant(tool_calls), tool, tool, assistant.
// The server bundles the two tool_results into a SINGLE user slot
// (the tool_results /turn), so 5 local messages → 4 server slots:
// [user, assistant, user(bundled), assistant].
let msgs = vec![
msg(Role::User, "q"),
msg(Role::Assistant, "calling tools"),
msg(Role::Tool, "result-a"),
msg(Role::Tool, "result-b"),
msg(Role::Assistant, "final"),
];
assert_eq!(count_server_slots(&msgs), 4);
}
#[test]
fn count_server_slots_two_tool_rounds() {
// user, assistant(tc), [tool], assistant(tc), [tool], assistant
// → server: user, asst, user, asst, user, asst = 6 slots.
let msgs = vec![
msg(Role::User, "q"),
msg(Role::Assistant, "tc1"),
msg(Role::Tool, "r1"),
msg(Role::Assistant, "tc2"),
msg(Role::Tool, "r2"),
msg(Role::Assistant, "final"),
];
assert_eq!(count_server_slots(&msgs), 6);
}
#[test]
fn count_server_slots_skips_system() {
// A leading system message lives in the prefix, not a message slot.
let msgs = vec![
msg(Role::System, "you are concise"),
msg(Role::User, "hi"),
msg(Role::Assistant, "hello"),
];
assert_eq!(count_server_slots(&msgs), 2);
}
#[test]
fn count_server_slots_post_splice_self_consistent() {
// After a splice the local history is head(2) + summary(1 user) +
// tail. count_server_slots of that rebuild must equal
// keep_head + 1 + keep_tail so the basis stays aligned with the
// server's reported msgs_count across repeated compactions.
let keep_head = 2;
let keep_tail = 4;
let mut rebuilt = vec![
msg(Role::User, "first"),
msg(Role::Assistant, "first-reply"),
];
rebuilt.push(msg(Role::User, "[CONTEXT COMPACTION] summary"));
rebuilt.extend(vec![
msg(Role::User, "t1"),
msg(Role::Assistant, "t1r"),
msg(Role::User, "t2"),
msg(Role::Assistant, "t2r"),
]);
assert_eq!(count_server_slots(&rebuilt), keep_head + 1 + keep_tail);
}
}