use rusqlite::{params, Connection};
use tracing::warn;
use crate::error::Result;
use crate::models::mediation::TranscriptParty;
use crate::models::reasoning::TranscriptEntry;
pub fn load_transcript_for_session(
conn: &Connection,
session_id: &str,
max_rows: usize,
) -> Result<Vec<TranscriptEntry>> {
let (buyer_sp, seller_sp): (Option<String>, Option<String>) = match conn.query_row(
"SELECT buyer_shared_pubkey, seller_shared_pubkey
FROM mediation_sessions
WHERE session_id = ?1",
params![session_id],
|r| {
Ok((
r.get::<_, Option<String>>(0)?,
r.get::<_, Option<String>>(1)?,
))
},
) {
Ok(pair) => pair,
Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(Vec::new()),
Err(e) => return Err(e.into()),
};
let mut stmt = conn.prepare(
"SELECT direction, shared_pubkey, inner_event_created_at, content
FROM mediation_messages
WHERE session_id = ?1 AND stale = 0
ORDER BY inner_event_created_at DESC, id DESC
LIMIT ?2",
)?;
let rows = stmt.query_map(params![session_id, max_rows as i64], |r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, i64>(2)?,
r.get::<_, String>(3)?,
))
})?;
let mut out: Vec<TranscriptEntry> = Vec::with_capacity(max_rows);
for row in rows {
let (direction, shared_pubkey, ts, content) = row?;
let party = match direction.as_str() {
"outbound" => TranscriptParty::Serbero,
"inbound" => {
if Some(shared_pubkey.as_str()) == buyer_sp.as_deref() {
TranscriptParty::Buyer
} else if Some(shared_pubkey.as_str()) == seller_sp.as_deref() {
TranscriptParty::Seller
} else {
warn!(
session_id = %session_id,
shared_pubkey = %shared_pubkey,
"transcript: dropping inbound row with unknown shared_pubkey"
);
continue;
}
}
other => {
warn!(
session_id = %session_id,
direction = other,
"transcript: dropping row with unrecognised direction"
);
continue;
}
};
out.push(TranscriptEntry {
party,
inner_event_created_at: ts,
content,
});
}
out.reverse();
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
fn seeded_conn(buyer_sp: &str, seller_sp: &str) -> Connection {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES ('d-t117', 'evt-t117', 'mostro', 'buyer',
'initiated', 1, 2, 'notified')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO mediation_sessions (
session_id, dispute_id, state, round_count,
prompt_bundle_id, policy_hash,
buyer_shared_pubkey, seller_shared_pubkey,
started_at, last_transition_at
) VALUES ('sess-t117', 'd-t117', 'awaiting_response', 0,
'phase3-default', 'hash',
?1, ?2, 100, 100)",
params![buyer_sp, seller_sp],
)
.unwrap();
conn
}
#[allow(clippy::too_many_arguments)]
fn insert_msg(
conn: &Connection,
direction: &str,
party: &str,
shared_pubkey: &str,
inner_event_id: &str,
inner_event_created_at: i64,
content: &str,
stale: bool,
) {
conn.execute(
"INSERT INTO mediation_messages (
session_id, direction, party, shared_pubkey,
inner_event_id, inner_event_created_at,
content, persisted_at, stale
) VALUES ('sess-t117', ?1, ?2, ?3, ?4, ?5, ?6, ?5, ?7)",
params![
direction,
party,
shared_pubkey,
inner_event_id,
inner_event_created_at,
content,
if stale { 1 } else { 0 }
],
)
.unwrap();
}
#[test]
fn unknown_session_returns_empty_vec() {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
let got = load_transcript_for_session(&conn, "does-not-exist", 40).unwrap();
assert!(got.is_empty());
}
#[test]
fn session_with_no_messages_returns_empty_vec() {
let conn = seeded_conn("buyer-sp", "seller-sp");
let got = load_transcript_for_session(&conn, "sess-t117", 40).unwrap();
assert!(got.is_empty());
}
#[test]
fn normal_flow_returns_ascending_order_with_party_tags() {
let conn = seeded_conn("buyer-sp", "seller-sp");
insert_msg(
&conn,
"outbound",
"buyer",
"buyer-sp",
"e-out-1",
100,
"hello buyer",
false,
);
insert_msg(
&conn,
"outbound",
"seller",
"seller-sp",
"e-out-2",
101,
"hello seller",
false,
);
insert_msg(
&conn,
"inbound",
"buyer",
"buyer-sp",
"e-in-1",
200,
"buyer reply",
false,
);
insert_msg(
&conn,
"inbound",
"seller",
"seller-sp",
"e-in-2",
300,
"seller reply",
false,
);
let got = load_transcript_for_session(&conn, "sess-t117", 40).unwrap();
assert_eq!(got.len(), 4);
assert_eq!(got[0].inner_event_created_at, 100);
assert_eq!(got[0].party, TranscriptParty::Serbero);
assert_eq!(got[0].content, "hello buyer");
assert_eq!(got[1].party, TranscriptParty::Serbero);
assert_eq!(got[1].content, "hello seller");
assert_eq!(got[2].party, TranscriptParty::Buyer);
assert_eq!(got[2].content, "buyer reply");
assert_eq!(got[3].party, TranscriptParty::Seller);
assert_eq!(got[3].content, "seller reply");
}
#[test]
fn stale_rows_are_excluded() {
let conn = seeded_conn("buyer-sp", "seller-sp");
insert_msg(
&conn, "inbound", "buyer", "buyer-sp", "e-fresh", 100, "fresh", false,
);
insert_msg(
&conn,
"inbound",
"seller",
"seller-sp",
"e-stale",
150,
"stale",
true,
);
let got = load_transcript_for_session(&conn, "sess-t117", 40).unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].content, "fresh");
assert_eq!(got[0].party, TranscriptParty::Buyer);
}
#[test]
fn inbound_with_unknown_shared_pubkey_is_dropped() {
let conn = seeded_conn("buyer-sp", "seller-sp");
insert_msg(
&conn, "inbound", "buyer", "buyer-sp", "e-ok", 100, "ok", false,
);
insert_msg(
&conn,
"inbound",
"buyer",
"other-sp",
"e-mystery",
101,
"mystery",
false,
);
let got = load_transcript_for_session(&conn, "sess-t117", 40).unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].content, "ok");
}
#[test]
fn cap_returns_the_last_n_entries_in_ascending_order() {
let conn = seeded_conn("buyer-sp", "seller-sp");
for i in 0..10 {
insert_msg(
&conn,
"inbound",
"buyer",
"buyer-sp",
&format!("e-{i}"),
i,
&format!("msg {i}"),
false,
);
}
let got = load_transcript_for_session(&conn, "sess-t117", 3).unwrap();
assert_eq!(got.len(), 3);
assert_eq!(got[0].inner_event_created_at, 7);
assert_eq!(got[1].inner_event_created_at, 8);
assert_eq!(got[2].inner_event_created_at, 9);
}
#[test]
fn zero_max_rows_returns_empty_vec() {
let conn = seeded_conn("buyer-sp", "seller-sp");
insert_msg(&conn, "inbound", "buyer", "buyer-sp", "e", 100, "x", false);
let got = load_transcript_for_session(&conn, "sess-t117", 0).unwrap();
assert!(got.is_empty());
}
}