codex-memory 0.1.40

An advanced hierarchical memory system for AI agents with MCP integration
Documentation
#!/usr/bin/env python3
"""
MCP Auto-Harvesting Proxy
Sits between Claude Desktop and codex-memory MCP server
Automatically harvests conversations without user intervention
"""
import json
import sys
import subprocess
import threading
import time
import os
from datetime import datetime
from collections import deque
import hashlib

class MCPHarvestProxy:
    def __init__(self):
        # Start the real MCP server
        env = os.environ.copy()
        env.update({
            "DATABASE_URL": "postgresql://codex_user:MZSfXiLr5uR3QYbRwv2vTzi22SvFkj4a@192.168.1.104:5432/codex_db",
            "EMBEDDING_PROVIDER": "ollama",
            "EMBEDDING_MODEL": "nomic-embed-text",
            "EMBEDDING_BASE_URL": "http://192.168.1.110:11434",
            "RUST_LOG": "info"
        })
        
        self.mcp_process = subprocess.Popen(
            ['/Users/ladvien/.cargo/bin/codex-memory', 'mcp-stdio', '--skip-setup'],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=0,
            env=env
        )
        
        # Conversation tracking
        self.conversation_buffer = deque(maxlen=50)  # Keep last 50 messages
        self.last_harvest_time = time.time()
        self.message_count = 0
        self.pending_harvest = False
        self.harvest_lock = threading.Lock()
        
        # Pattern detection
        self.harvest_patterns = [
            'i prefer', 'i like', 'my favorite', 'i love', 'i hate',
            'i decided', 'i chose', 'i will', "i'm going to",
            'i am', 'i work', 'my name', 'i live',
            'my goal', 'i want to', "i'm trying to",
            'actually', 'correction', 'i meant',
            'i feel', "i'm excited", "i'm worried",
            'i can', "i'm good at", 'i know how to'
        ]
        
        # Start background threads
        self.start_output_handler()
        self.start_harvest_timer()
        
        sys.stderr.write("[PROXY] Auto-harvest proxy started\n")
        sys.stderr.flush()
    
    def should_harvest(self, message_str):
        """Check if message contains harvestable patterns"""
        lower_msg = message_str.lower()
        return any(pattern in lower_msg for pattern in self.harvest_patterns)
    
    def extract_conversation_content(self, message):
        """Extract human-readable content from message"""
        try:
            # Look for tool results or responses
            if 'result' in message:
                if isinstance(message['result'], dict):
                    content = message['result'].get('content', '')
                    if isinstance(content, list):
                        # Handle content array format
                        texts = []
                        for item in content:
                            if isinstance(item, dict) and 'text' in item:
                                texts.append(item['text'])
                        return ' '.join(texts)
                    return str(content)
                return str(message['result'])
            
            # Look for params content
            if 'params' in message:
                params = message['params']
                if 'messages' in params:
                    # Handle conversation messages
                    texts = []
                    for msg in params['messages']:
                        if 'content' in msg:
                            texts.append(f"{msg.get('role', 'unknown')}: {msg['content']}")
                    return '\n'.join(texts)
                elif 'content' in params:
                    return params['content']
            
            # Look for method calls
            if 'method' in message:
                method = message['method']
                if 'arguments' in message.get('params', {}):
                    args = message['params']['arguments']
                    if 'message' in args:
                        return args['message']
                    elif 'content' in args:
                        return args['content']
            
            return None
        except:
            return None
    
    def create_harvest_request(self):
        """Create a harvest request from buffered conversation"""
        # Combine conversation buffer into coherent text
        conversation_parts = []
        
        for msg in self.conversation_buffer:
            content = self.extract_conversation_content(msg)
            if content and len(content) > 10:  # Skip very short content
                conversation_parts.append(content)
        
        if not conversation_parts:
            return None
        
        combined = '\n\n'.join(conversation_parts[-10:])  # Last 10 meaningful exchanges
        
        # Create unique ID for this harvest
        harvest_id = f"proxy_harvest_{int(time.time())}_{hashlib.md5(combined.encode()).hexdigest()[:8]}"
        
        return {
            "jsonrpc": "2.0",
            "id": harvest_id,
            "method": "tools/call",
            "params": {
                "name": "harvest_conversation",
                "arguments": {
                    "message": combined,
                    "context": "auto-harvested conversation",
                    "force_harvest": True,
                    "silent_mode": True
                }
            }
        }
    
    def inject_harvest(self):
        """Inject a harvest command into the MCP stream"""
        with self.harvest_lock:
            if not self.conversation_buffer:
                return
            
            harvest_req = self.create_harvest_request()
            if harvest_req:
                try:
                    # Send harvest request to MCP
                    self.mcp_process.stdin.write(json.dumps(harvest_req) + '\n')
                    self.mcp_process.stdin.flush()
                    
                    self.last_harvest_time = time.time()
                    self.pending_harvest = False
                    
                    sys.stderr.write(f"[PROXY] Auto-harvested {len(self.conversation_buffer)} messages at {datetime.now()}\n")
                    sys.stderr.flush()
                    
                    # Clear old messages from buffer
                    while len(self.conversation_buffer) > 10:
                        self.conversation_buffer.popleft()
                except Exception as e:
                    sys.stderr.write(f"[PROXY] Harvest failed: {e}\n")
                    sys.stderr.flush()
    
    def process_inbound_message(self, message):
        """Process messages from Claude to MCP"""
        try:
            # Add to conversation buffer
            self.conversation_buffer.append(message)
            self.message_count += 1
            
            # Check if we should harvest
            message_str = json.dumps(message)
            
            # Detect conversation patterns
            if self.should_harvest(message_str):
                self.pending_harvest = True
            
            # Harvest after 10 messages or if patterns detected
            if self.message_count >= 10 or self.pending_harvest:
                if time.time() - self.last_harvest_time > 30:  # Min 30 seconds between harvests
                    threading.Thread(target=self.inject_harvest).start()
                    self.message_count = 0
            
            # Forward original message to MCP
            self.mcp_process.stdin.write(json.dumps(message) + '\n')
            self.mcp_process.stdin.flush()
            
        except Exception as e:
            sys.stderr.write(f"[PROXY] Error processing inbound: {e}\n")
            sys.stderr.flush()
            # Forward even if processing fails
            self.mcp_process.stdin.write(json.dumps(message) + '\n')
            self.mcp_process.stdin.flush()
    
    def start_output_handler(self):
        """Handle MCP output back to Claude"""
        def forward_output():
            for line in self.mcp_process.stdout:
                try:
                    # Parse and check if it's our harvest response
                    msg = json.loads(line)
                    
                    # Filter out our harvest responses from going back to Claude
                    if 'id' in msg and isinstance(msg['id'], str) and msg['id'].startswith('proxy_harvest_'):
                        sys.stderr.write(f"[PROXY] Harvest completed: {msg.get('result', {}).get('content', 'success')[:100]}\n")
                        sys.stderr.flush()
                        continue  # Don't forward harvest responses to Claude
                    
                    # Forward all other responses
                    sys.stdout.write(line)
                    sys.stdout.flush()
                except:
                    # Forward unparseable lines as-is
                    sys.stdout.write(line)
                    sys.stdout.flush()
        
        thread = threading.Thread(target=forward_output)
        thread.daemon = True
        thread.start()
    
    def start_harvest_timer(self):
        """Periodically harvest even without triggers"""
        def timer_harvest():
            while True:
                time.sleep(300)  # Every 5 minutes
                if self.conversation_buffer and time.time() - self.last_harvest_time > 300:
                    sys.stderr.write("[PROXY] Timer-triggered harvest\n")
                    sys.stderr.flush()
                    self.inject_harvest()
        
        thread = threading.Thread(target=timer_harvest)
        thread.daemon = True
        thread.start()
    
    def run(self):
        """Main proxy loop"""
        try:
            for line in sys.stdin:
                try:
                    message = json.loads(line)
                    self.process_inbound_message(message)
                except json.JSONDecodeError:
                    # Forward non-JSON lines directly
                    self.mcp_process.stdin.write(line)
                    self.mcp_process.stdin.flush()
        except KeyboardInterrupt:
            sys.stderr.write("[PROXY] Shutting down\n")
            sys.stderr.flush()
        finally:
            self.mcp_process.terminate()

if __name__ == "__main__":
    proxy = MCPHarvestProxy()
    proxy.run()