1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// Storage Tools - Store content with automatic layer generation
use crate::{MemoryOperations, Result, types::*};
use chrono::Utc;
use cortex_mem_core::{FilesystemOperations, MessageRole};
use cortex_mem_core::memory_events::{MemoryEvent, ChangeType};
use cortex_mem_core::memory_index::MemoryScope;
use std::collections::HashMap;
impl MemoryOperations {
/// Store content with automatic L0/L1 layer generation
///
/// IMPORTANT: Layer generation is now fully asynchronous to avoid blocking
/// the agent's response. For session scope, we send LayerUpdateNeeded events
/// which are processed by MemoryEventCoordinator in the background.
pub async fn store(&self, args: StoreArgs) -> Result<StoreResponse> {
// Determine storage scope: user, session, or agent
let scope = match args.scope.as_str() {
"user" | "session" | "agent" => args.scope.as_str(),
_ => "session", // Default to session
};
// Build URI based on scope
// Note: This stores raw messages to memories/ subdirectory.
// Extracted structured memories (preferences, entities, cases, etc.)
// are stored in their respective directories after commit().
let uri = match scope {
"user" => {
// cortex://user/{user_id}/memories/YYYY-MM/DD/HH_MM_SS_id.md
let user_id = args.user_id.as_deref().unwrap_or("default");
let now = Utc::now();
let year_month = now.format("%Y-%m").to_string();
let day = now.format("%d").to_string();
let filename = format!(
"{}_{}.md",
now.format("%H_%M_%S"),
uuid::Uuid::new_v4()
.to_string()
.split('-')
.next()
.unwrap_or("unknown")
);
format!(
"cortex://user/{}/memories/{}/{}/{}",
user_id, year_month, day, filename
)
}
"agent" => {
// cortex://agent/{agent_id}/memories/YYYY-MM/DD/HH_MM_SS_id.md
let agent_id = args
.agent_id
.as_deref()
.or_else(|| {
if args.thread_id.is_empty() {
None
} else {
Some(&args.thread_id)
}
})
.unwrap_or("default");
let now = Utc::now();
let year_month = now.format("%Y-%m").to_string();
let day = now.format("%d").to_string();
let filename = format!(
"{}_{}.md",
now.format("%H_%M_%S"),
uuid::Uuid::new_v4()
.to_string()
.split('-')
.next()
.unwrap_or("unknown")
);
format!(
"cortex://agent/{}/memories/{}/{}/{}",
agent_id, year_month, day, filename
)
}
"session" => {
// cortex://session/{thread_id}/timeline/YYYY-MM/DD/HH_MM_SS_id.md
let thread_id = if args.thread_id.is_empty() {
"default".to_string()
} else {
args.thread_id.clone()
};
// 🔧 Fix: Release lock immediately after operations
let message = {
let sm = self.session_manager.write().await;
// 🔧 Ensure session exists with user_id and agent_id
if !sm.session_exists(&thread_id).await? {
// 使用create_session_with_ids传入user_id和agent_id
sm.create_session_with_ids(
&thread_id,
args.user_id
.clone()
.or_else(|| Some(self.default_user_id.clone())),
args.agent_id
.clone()
.or_else(|| Some(self.default_agent_id.clone())),
)
.await?;
} else {
// 🔧 如果session已存在但缺少user_id/agent_id,更新它
if let Ok(mut metadata) = sm.load_session(&thread_id).await {
let mut needs_update = false;
if metadata.user_id.is_none() {
metadata.user_id = args
.user_id
.clone()
.or_else(|| Some(self.default_user_id.clone()));
needs_update = true;
}
if metadata.agent_id.is_none() {
metadata.agent_id = args
.agent_id
.clone()
.or_else(|| Some(self.default_agent_id.clone()));
needs_update = true;
}
if needs_update {
let _ = sm.update_session(&metadata).await;
}
}
}
// 使用add_message()发布事件,而不是直接调用save_message()
sm.add_message(
&thread_id,
MessageRole::User, // 默认使用User角色
args.content.clone(),
)
.await?
}; // Lock is released here
// 返回消息URI
let year_month = message.timestamp.format("%Y-%m").to_string();
let day = message.timestamp.format("%d").to_string();
let filename = format!(
"{}_{}.md",
message.timestamp.format("%H_%M_%S"),
&message.id[..8]
);
format!(
"cortex://session/{}/timeline/{}/{}/{}",
thread_id, year_month, day, filename
)
}
_ => unreachable!(),
};
// For user and agent scope, directly write to filesystem
if scope == "user" || scope == "agent" {
self.filesystem.write(&uri, &args.content).await?;
}
// 🔧 Layer generation is now FULLY ASYNCHRONOUS
// We send events to MemoryEventCoordinator which processes them in background
// This prevents blocking the agent's response
let layers_generated = HashMap::new();
if args.auto_generate_layers.unwrap_or(true) {
match scope {
"user" => {
// Send LayerUpdateNeeded event for user scope
if let Some(ref tx) = self.memory_event_tx {
let user_id = args.user_id.clone().unwrap_or_else(|| self.default_user_id.clone());
let parent_dir = uri.rsplit_once('/')
.map(|(dir, _)| dir.to_string())
.unwrap_or_else(|| uri.clone());
let _ = tx.send(MemoryEvent::LayerUpdateNeeded {
scope: MemoryScope::User,
owner_id: user_id,
directory_uri: parent_dir,
change_type: ChangeType::Add,
changed_file: uri.clone(),
});
tracing::debug!("📤 Sent LayerUpdateNeeded event for user scope");
} else {
// Fallback: synchronous generation (should not happen in production)
tracing::warn!("⚠️ memory_event_tx not available, falling back to sync generation");
if let Err(e) = self.layer_manager.generate_all_layers(&uri, &args.content, &[]).await {
tracing::warn!("Failed to generate layers for {}: {}", uri, e);
}
}
}
"agent" => {
// Send LayerUpdateNeeded event for agent scope
if let Some(ref tx) = self.memory_event_tx {
let agent_id = args.agent_id.clone()
.or_else(|| Some(args.thread_id.clone()))
.unwrap_or_else(|| self.default_agent_id.clone());
let parent_dir = uri.rsplit_once('/')
.map(|(dir, _)| dir.to_string())
.unwrap_or_else(|| uri.clone());
let _ = tx.send(MemoryEvent::LayerUpdateNeeded {
scope: MemoryScope::Agent,
owner_id: agent_id,
directory_uri: parent_dir,
change_type: ChangeType::Add,
changed_file: uri.clone(),
});
tracing::debug!("📤 Sent LayerUpdateNeeded event for agent scope");
} else {
tracing::warn!("⚠️ memory_event_tx not available, falling back to sync generation");
if let Err(e) = self.layer_manager.generate_all_layers(&uri, &args.content, &[]).await {
tracing::warn!("Failed to generate layers for {}: {}", uri, e);
}
}
}
"session" => {
// Session scope: Send LayerUpdateNeeded for the timeline directory
// Layer generation is deferred to session close for efficiency
// But we can optionally trigger incremental updates here
if let Some(ref tx) = self.memory_event_tx {
let thread_id = if args.thread_id.is_empty() {
"default".to_string()
} else {
args.thread_id.clone()
};
let parent_dir = uri.rsplit_once('/')
.map(|(dir, _)| dir.to_string())
.unwrap_or_else(|| uri.clone());
let _ = tx.send(MemoryEvent::LayerUpdateNeeded {
scope: MemoryScope::Session,
owner_id: thread_id,
directory_uri: parent_dir,
change_type: ChangeType::Add,
changed_file: uri.clone(),
});
tracing::debug!("📤 Sent LayerUpdateNeeded event for session scope");
}
// Note: Session-level layers are primarily generated on session close
// This event enables optional incremental updates
}
_ => {}
}
}
Ok(StoreResponse {
uri,
layers_generated,
success: true,
})
}
}