use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
use chrono::{DateTime, Utc};
use kanade_shared::wire::ObsEvent;
use serde_json::{Value, json};
use tracing::warn;
const POLL_INTERVAL: Duration = Duration::from_secs(5 * 60);
const BOOTSTRAP_HOURS: i64 = 24;
struct Source {
channel: &'static str,
provider: Option<&'static str>,
id: u32,
kind: &'static str,
}
const SOURCES: &[Source] = &[
Source {
channel: "System",
provider: Some("Microsoft-Windows-Winlogon"),
id: 7001,
kind: "logon",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Winlogon"),
id: 7002,
kind: "logoff",
},
Source {
channel: "Security",
provider: None,
id: 4800,
kind: "lock",
},
Source {
channel: "Security",
provider: None,
id: 4801,
kind: "unlock",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Kernel-General"),
id: 12,
kind: "boot",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Kernel-General"),
id: 13,
kind: "shutdown",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Kernel-Power"),
id: 41,
kind: "unexpected_shutdown",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Kernel-Power"),
id: 42,
kind: "sleep",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Kernel-Power"),
id: 107,
kind: "resume",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Kernel-Power"),
id: 506,
kind: "sleep",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Kernel-Power"),
id: 507,
kind: "resume",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Kernel-Boot"),
id: 27,
kind: "resume",
},
Source {
channel: "System",
provider: Some("Microsoft-Windows-Power-Troubleshooter"),
id: 1,
kind: "wake_detail",
},
Source {
channel: "System",
provider: None,
id: 6005,
kind: "log_service_started",
},
Source {
channel: "System",
provider: None,
id: 6006,
kind: "log_service_stopped",
},
];
type Watermarks = HashMap<String, u64>;
fn source_key(s: &Source) -> String {
format!(
"{}:{}:{}:{}",
s.channel,
s.provider.unwrap_or(""),
s.id,
s.kind
)
}
fn build_query(s: &Source, since: DateTime<Utc>) -> String {
let since = since.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let provider = match s.provider {
Some(p) => format!("Provider[@Name='{p}'] and "),
None => String::new(),
};
format!(
"*[System[{provider}(EventID={id}) and TimeCreated[@SystemTime>='{since}']]]",
id = s.id
)
}
struct ParsedEvent {
record_id: u64,
at: DateTime<Utc>,
named: HashMap<String, String>,
positional: Vec<String>,
}
fn child<'a>(n: roxmltree::Node<'a, 'a>, local: &str) -> Option<roxmltree::Node<'a, 'a>> {
n.children()
.find(|c| c.is_element() && c.tag_name().name() == local)
}
fn parse_event_xml(xml: &str) -> Option<ParsedEvent> {
let doc = roxmltree::Document::parse(xml).ok()?;
let system = child(doc.root_element(), "System")?;
let record_id = child(system, "EventRecordID")?
.text()?
.trim()
.parse::<u64>()
.ok()?;
let at = DateTime::parse_from_rfc3339(child(system, "TimeCreated")?.attribute("SystemTime")?)
.ok()?
.with_timezone(&Utc);
let mut named = HashMap::new();
let mut positional = Vec::new();
if let Some(event_data) = child(doc.root_element(), "EventData") {
for d in event_data
.children()
.filter(|c| c.is_element() && c.tag_name().name() == "Data")
{
let v = d.text().unwrap_or("").to_string();
match d.attribute("Name") {
Some(name) => {
named.insert(name.to_string(), v);
}
None => positional.push(v),
}
}
}
Some(ParsedEvent {
record_id,
at,
named,
positional,
})
}
enum Shaped {
Emit(Value),
Skip,
}
fn shape(s: &Source, ev: &ParsedEvent, mut resolve_user: impl FnMut(&str) -> String) -> Shaped {
match s.id {
7001 | 7002 => {
let sid = ev.positional.get(1).map(String::as_str);
let user = sid.map(&mut resolve_user);
let session_id = ev
.positional
.first()
.and_then(|v| v.trim().parse::<i64>().ok());
Shaped::Emit(json!({ "user": user, "sid": sid, "session_id": session_id }))
}
4800 | 4801 => {
let user = ev.named.get("TargetUserName").map(String::as_str);
let session_id = ev
.named
.get("SessionId")
.and_then(|v| v.trim().parse::<i64>().ok());
Shaped::Emit(json!({ "user": user, "session_id": session_id }))
}
506 | 507 => Shaped::Emit(json!({ "standby": "modern" })),
27 => match ev
.positional
.first()
.and_then(|v| v.trim().parse::<i64>().ok())
{
Some(2) => Shaped::Emit(json!({ "from": "hibernate" })),
_ => Shaped::Skip,
},
1 => {
let wake_source = ev
.named
.get("WakeSourceText")
.filter(|v| !v.trim().is_empty())
.or_else(|| ev.named.get("WakeSourceType"))
.map(String::as_str);
Shaped::Emit(json!({
"sleep_start": ev.named.get("SleepTime").map(String::as_str),
"wake_time": ev.named.get("WakeTime").map(String::as_str),
"wake_source": wake_source,
}))
}
_ => Shaped::Emit(Value::Null),
}
}
#[derive(Clone, Copy, Default)]
struct DomainCtx {
domain_joined: bool,
domain_reachable: bool,
}
fn is_account_sid(sid: &str) -> bool {
sid.starts_with("S-1-5-21-")
}
fn should_translate(sid: &str, ctx: DomainCtx) -> bool {
if !is_account_sid(sid) {
return true; }
if !ctx.domain_joined {
return true; }
ctx.domain_reachable }
#[derive(Default)]
struct SidResolver {
cache: HashMap<String, String>,
}
impl SidResolver {
fn resolve(&mut self, sid: &str, ctx: DomainCtx) -> String {
if let Some(name) = self.cache.get(sid) {
return name.clone();
}
if should_translate(sid, ctx) {
if let Some(name) = translate_sid(sid) {
self.cache.insert(sid.to_string(), name.clone());
return name;
}
}
sid.to_string()
}
}
#[cfg(target_os = "windows")]
fn domain_ctx() -> DomainCtx {
match dns_domain() {
Some(domain) if !domain.is_empty() => DomainCtx {
domain_joined: true,
domain_reachable: domain_reachable(&domain),
},
_ => DomainCtx::default(),
}
}
#[cfg(not(target_os = "windows"))]
fn domain_ctx() -> DomainCtx {
DomainCtx::default()
}
#[cfg(target_os = "windows")]
fn dns_domain() -> Option<String> {
use windows::Win32::System::SystemInformation::{ComputerNameDnsDomain, GetComputerNameExW};
use windows::core::PWSTR;
let mut len: u32 = 0;
unsafe {
let _ = GetComputerNameExW(ComputerNameDnsDomain, None, &mut len);
}
if len == 0 {
return Some(String::new());
}
let mut buf = vec![0u16; len as usize];
unsafe {
GetComputerNameExW(
ComputerNameDnsDomain,
Some(PWSTR(buf.as_mut_ptr())),
&mut len,
)
}
.ok()?;
Some(String::from_utf16_lossy(&buf[..len as usize]))
}
#[cfg(target_os = "windows")]
fn domain_reachable(domain: &str) -> bool {
use std::net::ToSocketAddrs;
use std::sync::mpsc;
let target = format!("{domain}:0");
let (tx, rx) = mpsc::channel();
std::thread::spawn(move || {
let resolved = target
.as_str()
.to_socket_addrs()
.map(|mut addrs| addrs.next().is_some())
.unwrap_or(false);
let _ = tx.send(resolved);
});
rx.recv_timeout(Duration::from_secs(2)).unwrap_or(false)
}
#[cfg(target_os = "windows")]
fn translate_sid(sid: &str) -> Option<String> {
use windows::Win32::Foundation::{HLOCAL, LocalFree};
use windows::Win32::Security::Authorization::ConvertStringSidToSidW;
use windows::Win32::Security::{LookupAccountSidW, PSID, SidTypeUser};
use windows::core::{HSTRING, PWSTR};
let wide = HSTRING::from(sid);
let mut psid = PSID::default();
if unsafe { ConvertStringSidToSidW(&wide, &mut psid) }.is_err() || psid.is_invalid() {
return None;
}
let lookup = || {
let mut name_len: u32 = 0;
let mut domain_len: u32 = 0;
let mut sid_use = SidTypeUser;
unsafe {
let _ = LookupAccountSidW(
None,
psid,
None,
&mut name_len,
None,
&mut domain_len,
&mut sid_use,
);
}
if name_len == 0 {
return None;
}
let mut name = vec![0u16; name_len as usize];
let mut domain = vec![0u16; domain_len.max(1) as usize];
unsafe {
LookupAccountSidW(
None,
psid,
Some(PWSTR(name.as_mut_ptr())),
&mut name_len,
Some(PWSTR(domain.as_mut_ptr())),
&mut domain_len,
&mut sid_use,
)
}
.ok()?;
let leaf = String::from_utf16_lossy(&name[..name_len as usize]);
let leaf = leaf.trim_end_matches('\0');
if leaf.is_empty() {
None
} else {
Some(leaf.to_string())
}
};
let result = lookup();
unsafe {
let _ = LocalFree(Some(HLOCAL(psid.0)));
}
result
}
#[cfg(not(target_os = "windows"))]
fn translate_sid(_sid: &str) -> Option<String> {
None
}
fn load_watermarks(path: &Path) -> Watermarks {
match std::fs::read(path) {
Ok(bytes) => serde_json::from_slice(&bytes).unwrap_or_else(|e| {
warn!(error = %e, "winlog: watermark file corrupt — bootstrapping fresh");
Watermarks::new()
}),
Err(e) if e.kind() != std::io::ErrorKind::NotFound => {
warn!(error = %e, "winlog: watermark file unreadable — bootstrapping fresh");
Watermarks::new()
}
Err(_) => Watermarks::new(),
}
}
fn save_watermarks(path: &Path, w: &Watermarks) {
let tmp = path.with_extension("json.tmp");
let res = serde_json::to_vec(w)
.map_err(anyhow::Error::from)
.and_then(|bytes| std::fs::write(&tmp, bytes).map_err(anyhow::Error::from))
.and_then(|()| std::fs::rename(&tmp, path).map_err(anyhow::Error::from));
if let Err(e) = res {
warn!(error = %e, "winlog: watermark persist failed");
}
}
fn poll_once(
pc_id: &str,
dir: &Path,
mut watermarks: Watermarks,
mut resolver: SidResolver,
) -> (Watermarks, SidResolver) {
let since = Utc::now() - chrono::Duration::hours(BOOTSTRAP_HOURS);
let ctx = domain_ctx();
for s in SOURCES {
let key = source_key(s);
let cutoff = watermarks.get(&key).copied();
let xmls = match query_events(s.channel, &build_query(s, since)) {
Ok(x) => x,
Err(e) => {
warn!(error = %e, source = %key, "winlog: query failed");
continue;
}
};
let mut events: Vec<ParsedEvent> = xmls.iter().filter_map(|x| parse_event_xml(x)).collect();
events.sort_by_key(|e| e.record_id);
let mut max_seen = cutoff;
for ev in &events {
if cutoff.is_some_and(|c| ev.record_id <= c) {
continue; }
let payload = match shape(s, ev, |sid| resolver.resolve(sid, ctx)) {
Shaped::Emit(p) => p,
Shaped::Skip => {
max_seen = Some(max_seen.map_or(ev.record_id, |m| m.max(ev.record_id)));
continue;
}
};
let event = ObsEvent {
pc_id: pc_id.to_string(),
at: ev.at,
kind: s.kind.to_string(),
source: format!("winlog:{}", s.channel),
event_record_id: Some(ev.record_id.to_string()),
payload,
};
if let Err(e) = crate::obs_outbox::enqueue(dir, &event) {
warn!(error = %e, kind = s.kind, "winlog: enqueue failed — retrying next poll");
break; }
max_seen = Some(max_seen.map_or(ev.record_id, |m| m.max(ev.record_id)));
}
if let Some(m) = max_seen {
if Some(m) != cutoff {
watermarks.insert(key, m);
}
}
}
(watermarks, resolver)
}
pub async fn run(pc_id: String, obs_outbox_dir: PathBuf) {
if let Err(e) = crate::obs_outbox::ensure_outbox_dir(&obs_outbox_dir) {
warn!(error = %e, "winlog: outbox dir — events may be dropped until it exists");
}
let watermark_path =
kanade_shared::default_paths::data_dir().join("winlog-reader-watermark.json");
let mut watermarks = load_watermarks(&watermark_path);
let mut resolver = SidResolver::default();
let mut tick = tokio::time::interval(POLL_INTERVAL);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tick.tick().await;
let pc = pc_id.clone();
let dir = obs_outbox_dir.clone();
let wpath = watermark_path.clone();
let wm = std::mem::take(&mut watermarks);
let res = std::mem::take(&mut resolver);
match tokio::task::spawn_blocking(move || {
let (wm, res) = poll_once(&pc, &dir, wm, res);
save_watermarks(&wpath, &wm);
(wm, res)
})
.await
{
Ok((wm, res)) => {
watermarks = wm;
resolver = res;
}
Err(e) => {
warn!(error = %e, "winlog: poll task failed");
watermarks = load_watermarks(&watermark_path);
}
};
}
}
#[cfg(target_os = "windows")]
fn query_events(channel: &str, query: &str) -> anyhow::Result<Vec<String>> {
use windows::Win32::System::EventLog::{
EvtClose, EvtNext, EvtQuery, EvtQueryChannelPath, EvtQueryForwardDirection,
};
use windows::core::HSTRING;
let result_set = unsafe {
EvtQuery(
None,
&HSTRING::from(channel),
&HSTRING::from(query),
EvtQueryChannelPath.0 | EvtQueryForwardDirection.0,
)
}?;
let mut out = Vec::new();
const BATCH: usize = 32;
loop {
let mut handles = [0isize; BATCH];
let mut returned: u32 = 0;
let ok = unsafe { EvtNext(result_set, &mut handles, 0, 0, &mut returned) }.is_ok();
if !ok {
break;
}
for &h in handles.iter().take(returned as usize) {
let event = windows::Win32::System::EventLog::EVT_HANDLE(h);
if let Some(xml) = render_event_xml(event) {
out.push(xml);
}
unsafe {
let _ = EvtClose(event);
}
}
}
unsafe {
let _ = EvtClose(result_set);
}
Ok(out)
}
#[cfg(target_os = "windows")]
fn render_event_xml(event: windows::Win32::System::EventLog::EVT_HANDLE) -> Option<String> {
use windows::Win32::System::EventLog::{EvtRender, EvtRenderEventXml};
let mut used: u32 = 0;
let mut props: u32 = 0;
unsafe {
let _ = EvtRender(
None,
event,
EvtRenderEventXml.0,
0,
None,
&mut used,
&mut props,
);
}
if used == 0 {
return None;
}
let mut buf = vec![0u16; (used as usize).div_ceil(2)];
unsafe {
EvtRender(
None,
event,
EvtRenderEventXml.0,
used,
Some(buf.as_mut_ptr() as *mut core::ffi::c_void),
&mut used,
&mut props,
)
}
.ok()?;
let end = buf.iter().position(|&c| c == 0).unwrap_or(buf.len());
Some(String::from_utf16_lossy(&buf[..end]))
}
#[cfg(not(target_os = "windows"))]
fn query_events(_channel: &str, _query: &str) -> anyhow::Result<Vec<String>> {
Ok(Vec::new())
}
#[cfg(test)]
mod tests {
use super::*;
fn ev_xml(system_extra: &str, event_data: &str) -> String {
format!(
"<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>\
<System>{system_extra}\
<TimeCreated SystemTime='2026-06-27T01:02:03.123456700Z'/>\
<EventRecordID>4242</EventRecordID>\
</System>{event_data}</Event>"
)
}
#[test]
fn build_query_pins_provider_and_time() {
let s = &SOURCES[0]; let q = build_query(
s,
DateTime::parse_from_rfc3339("2026-06-26T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
);
assert!(
q.contains("Provider[@Name='Microsoft-Windows-Winlogon']"),
"q: {q}"
);
assert!(q.contains("(EventID=7001)"), "q: {q}");
assert!(
q.contains("TimeCreated[@SystemTime>='2026-06-26T00:00:00.000Z']"),
"q: {q}"
);
let bare = build_query(&SOURCES[13], Utc::now()); assert!(!bare.contains("Provider"), "q: {bare}");
assert!(bare.contains("(EventID=6005)"), "q: {bare}");
}
#[test]
fn parse_extracts_system_and_event_data() {
let xml = ev_xml(
"",
"<EventData><Data Name='TargetUserName'>alice</Data>\
<Data Name='SessionId'>3</Data></EventData>",
);
let p = parse_event_xml(&xml).expect("parse");
assert_eq!(p.record_id, 4242);
assert_eq!(p.at.to_rfc3339(), "2026-06-27T01:02:03.123456700+00:00");
assert_eq!(
p.named.get("TargetUserName").map(String::as_str),
Some("alice")
);
assert_eq!(p.named.get("SessionId").map(String::as_str), Some("3"));
assert!(p.positional.is_empty());
}
#[test]
fn parse_rejects_missing_system_fields() {
let xml = "<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>\
<System><TimeCreated SystemTime='2026-06-27T01:02:03Z'/></System></Event>";
assert!(parse_event_xml(xml).is_none());
assert!(parse_event_xml("not xml at all").is_none());
}
fn parse(system_extra: &str, event_data: &str) -> ParsedEvent {
parse_event_xml(&ev_xml(system_extra, event_data)).expect("parse")
}
fn raw_user(sid: &str) -> String {
sid.to_string()
}
#[test]
fn shape_logon_resolves_user_and_keeps_raw_sid() {
let s = &SOURCES[0]; let ev = parse(
"",
"<EventData><Data>2</Data><Data>S-1-5-21-1-2-3-1001</Data></EventData>",
);
let Shaped::Emit(p) = shape(s, &ev, |sid| {
assert_eq!(sid, "S-1-5-21-1-2-3-1001");
"alice".to_string()
}) else {
panic!("expected emit")
};
assert_eq!(p["session_id"], json!(2));
assert_eq!(p["sid"], json!("S-1-5-21-1-2-3-1001"));
assert_eq!(p["user"], json!("alice"));
let Shaped::Emit(p) = shape(s, &ev, raw_user) else {
panic!("expected emit")
};
assert_eq!(p["user"], json!("S-1-5-21-1-2-3-1001"));
let ev_nosid = parse("", "<EventData><Data>5</Data></EventData>");
let Shaped::Emit(p) = shape(s, &ev_nosid, |_| {
panic!("resolver must not run with no SID")
}) else {
panic!("expected emit")
};
assert_eq!(p["user"], json!(null));
assert_eq!(p["sid"], json!(null));
assert_eq!(p["session_id"], json!(5));
}
#[test]
fn should_translate_gates_domain_sids_on_reachability() {
let reachable = DomainCtx {
domain_joined: true,
domain_reachable: true,
};
let offline = DomainCtx {
domain_joined: true,
domain_reachable: false,
};
let workgroup = DomainCtx {
domain_joined: false,
domain_reachable: false,
};
assert!(should_translate("S-1-5-18", offline));
assert!(!is_account_sid("S-1-5-18"));
let dom = "S-1-5-21-9-9-9-1001";
assert!(is_account_sid(dom));
assert!(should_translate(dom, reachable));
assert!(!should_translate(dom, offline));
assert!(should_translate(dom, workgroup));
}
#[test]
fn resolver_caches_hits_and_falls_back_to_raw() {
let mut r = SidResolver::default();
r.cache.insert("S-1-5-21-1-2-3-1001".into(), "alice".into());
let ctx = DomainCtx {
domain_joined: true,
domain_reachable: true,
};
assert_eq!(r.resolve("S-1-5-21-1-2-3-1001", ctx), "alice");
assert_eq!(r.resolve("S-1-5-21-7-7-7-2002", ctx), "S-1-5-21-7-7-7-2002");
assert!(!r.cache.contains_key("S-1-5-21-7-7-7-2002"));
}
#[test]
fn shape_lock_uses_named_fields() {
let s = &SOURCES[2]; let ev = parse(
"",
"<EventData><Data Name='TargetUserName'>bob</Data><Data Name='SessionId'>1</Data></EventData>",
);
let Shaped::Emit(p) = shape(s, &ev, raw_user) else {
panic!("expected emit")
};
assert_eq!(p["user"], json!("bob"));
assert_eq!(p["session_id"], json!(1));
}
#[test]
fn shape_modern_standby_flags_payload() {
let s = &SOURCES[9]; let ev = parse("", "");
let Shaped::Emit(p) = shape(s, &ev, raw_user) else {
panic!("expected emit")
};
assert_eq!(p, json!({ "standby": "modern" }));
}
#[test]
fn shape_kernel_boot_27_only_emits_hibernate_resume() {
let s = &SOURCES[11]; let hib = parse("", "<EventData><Data>2</Data></EventData>");
let Shaped::Emit(p) = shape(s, &hib, raw_user) else {
panic!("expected emit")
};
assert_eq!(p, json!({ "from": "hibernate" }));
assert!(matches!(
shape(
s,
&parse("", "<EventData><Data>0</Data></EventData>"),
raw_user
),
Shaped::Skip
));
assert!(matches!(
shape(
s,
&parse("", "<EventData><Data>1</Data></EventData>"),
raw_user
),
Shaped::Skip
));
}
#[test]
fn shape_wake_detail_prefers_source_text_with_fallback() {
let s = &SOURCES[12]; let ev = parse(
"",
"<EventData><Data Name='SleepTime'>2026-06-27T00:00:00Z</Data>\
<Data Name='WakeTime'>2026-06-27T01:00:00Z</Data>\
<Data Name='WakeSourceType'>5</Data></EventData>",
);
let Shaped::Emit(p) = shape(s, &ev, raw_user) else {
panic!("expected emit")
};
assert_eq!(p["sleep_start"], json!("2026-06-27T00:00:00Z"));
assert_eq!(p["wake_time"], json!("2026-06-27T01:00:00Z"));
assert_eq!(p["wake_source"], json!("5"));
}
#[test]
fn shape_bare_presence_is_null_payload() {
let s = &SOURCES[4]; let Shaped::Emit(p) = shape(s, &parse("", ""), raw_user) else {
panic!("expected emit")
};
assert_eq!(p, Value::Null);
}
#[test]
fn watermarks_round_trip_through_disk() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("wm.json");
assert!(load_watermarks(&path).is_empty());
let mut w = Watermarks::new();
w.insert("System:7001:logon".into(), 99);
save_watermarks(&path, &w);
assert_eq!(load_watermarks(&path).get("System:7001:logon"), Some(&99));
std::fs::write(&path, b"{not json").unwrap();
assert!(load_watermarks(&path).is_empty());
}
#[test]
fn source_keys_are_unique() {
let mut keys: Vec<String> = SOURCES.iter().map(source_key).collect();
keys.sort();
let n = keys.len();
keys.dedup();
assert_eq!(keys.len(), n, "duplicate source key");
}
}