import json
import sys
import subprocess
import threading
import time
import os
from datetime import datetime
from collections import deque
import hashlib
class MCPHarvestProxy:
def __init__(self):
env = os.environ.copy()
env.update({
"DATABASE_URL": "postgresql://codex_user:MZSfXiLr5uR3QYbRwv2vTzi22SvFkj4a@192.168.1.104:5432/codex_db",
"EMBEDDING_PROVIDER": "ollama",
"EMBEDDING_MODEL": "nomic-embed-text",
"EMBEDDING_BASE_URL": "http://192.168.1.110:11434",
"RUST_LOG": "info"
})
self.mcp_process = subprocess.Popen(
['/Users/ladvien/.cargo/bin/codex-memory', 'mcp-stdio', '--skip-setup'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
env=env
)
self.conversation_buffer = deque(maxlen=50) self.last_harvest_time = time.time()
self.message_count = 0
self.pending_harvest = False
self.harvest_lock = threading.Lock()
self.harvest_patterns = [
'i prefer', 'i like', 'my favorite', 'i love', 'i hate',
'i decided', 'i chose', 'i will', "i'm going to",
'i am', 'i work', 'my name', 'i live',
'my goal', 'i want to', "i'm trying to",
'actually', 'correction', 'i meant',
'i feel', "i'm excited", "i'm worried",
'i can', "i'm good at", 'i know how to'
]
self.start_output_handler()
self.start_harvest_timer()
sys.stderr.write("[PROXY] Auto-harvest proxy started\n")
sys.stderr.flush()
def should_harvest(self, message_str):
lower_msg = message_str.lower()
return any(pattern in lower_msg for pattern in self.harvest_patterns)
def extract_conversation_content(self, message):
try:
if 'result' in message:
if isinstance(message['result'], dict):
content = message['result'].get('content', '')
if isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict) and 'text' in item:
texts.append(item['text'])
return ' '.join(texts)
return str(content)
return str(message['result'])
if 'params' in message:
params = message['params']
if 'messages' in params:
texts = []
for msg in params['messages']:
if 'content' in msg:
texts.append(f"{msg.get('role', 'unknown')}: {msg['content']}")
return '\n'.join(texts)
elif 'content' in params:
return params['content']
if 'method' in message:
method = message['method']
if 'arguments' in message.get('params', {}):
args = message['params']['arguments']
if 'message' in args:
return args['message']
elif 'content' in args:
return args['content']
return None
except:
return None
def create_harvest_request(self):
conversation_parts = []
for msg in self.conversation_buffer:
content = self.extract_conversation_content(msg)
if content and len(content) > 10: conversation_parts.append(content)
if not conversation_parts:
return None
combined = '\n\n'.join(conversation_parts[-10:])
harvest_id = f"proxy_harvest_{int(time.time())}_{hashlib.md5(combined.encode()).hexdigest()[:8]}"
return {
"jsonrpc": "2.0",
"id": harvest_id,
"method": "tools/call",
"params": {
"name": "harvest_conversation",
"arguments": {
"message": combined,
"context": "auto-harvested conversation",
"force_harvest": True,
"silent_mode": True
}
}
}
def inject_harvest(self):
with self.harvest_lock:
if not self.conversation_buffer:
return
harvest_req = self.create_harvest_request()
if harvest_req:
try:
self.mcp_process.stdin.write(json.dumps(harvest_req) + '\n')
self.mcp_process.stdin.flush()
self.last_harvest_time = time.time()
self.pending_harvest = False
sys.stderr.write(f"[PROXY] Auto-harvested {len(self.conversation_buffer)} messages at {datetime.now()}\n")
sys.stderr.flush()
while len(self.conversation_buffer) > 10:
self.conversation_buffer.popleft()
except Exception as e:
sys.stderr.write(f"[PROXY] Harvest failed: {e}\n")
sys.stderr.flush()
def process_inbound_message(self, message):
try:
self.conversation_buffer.append(message)
self.message_count += 1
message_str = json.dumps(message)
if self.should_harvest(message_str):
self.pending_harvest = True
if self.message_count >= 10 or self.pending_harvest:
if time.time() - self.last_harvest_time > 30: threading.Thread(target=self.inject_harvest).start()
self.message_count = 0
self.mcp_process.stdin.write(json.dumps(message) + '\n')
self.mcp_process.stdin.flush()
except Exception as e:
sys.stderr.write(f"[PROXY] Error processing inbound: {e}\n")
sys.stderr.flush()
self.mcp_process.stdin.write(json.dumps(message) + '\n')
self.mcp_process.stdin.flush()
def start_output_handler(self):
def forward_output():
for line in self.mcp_process.stdout:
try:
msg = json.loads(line)
if 'id' in msg and isinstance(msg['id'], str) and msg['id'].startswith('proxy_harvest_'):
sys.stderr.write(f"[PROXY] Harvest completed: {msg.get('result', {}).get('content', 'success')[:100]}\n")
sys.stderr.flush()
continue
sys.stdout.write(line)
sys.stdout.flush()
except:
sys.stdout.write(line)
sys.stdout.flush()
thread = threading.Thread(target=forward_output)
thread.daemon = True
thread.start()
def start_harvest_timer(self):
def timer_harvest():
while True:
time.sleep(300) if self.conversation_buffer and time.time() - self.last_harvest_time > 300:
sys.stderr.write("[PROXY] Timer-triggered harvest\n")
sys.stderr.flush()
self.inject_harvest()
thread = threading.Thread(target=timer_harvest)
thread.daemon = True
thread.start()
def run(self):
try:
for line in sys.stdin:
try:
message = json.loads(line)
self.process_inbound_message(message)
except json.JSONDecodeError:
self.mcp_process.stdin.write(line)
self.mcp_process.stdin.flush()
except KeyboardInterrupt:
sys.stderr.write("[PROXY] Shutting down\n")
sys.stderr.flush()
finally:
self.mcp_process.terminate()
if __name__ == "__main__":
proxy = MCPHarvestProxy()
proxy.run()