import asyncio
import random
class GridEventBus:
def __init__(self):
self.subscribers = []
def subscribe(self, agent):
self.subscribers.append(agent)
async def broadcast(self, sender: str, action: str, content: str):
print(f"\n[GRID] {sender} {action}: {content}")
for agent in self.subscribers:
if agent.name != sender:
await agent.on_event(sender, action, content)
class ProgramAgent:
def __init__(self, name: str, personality: str, bus: GridEventBus):
self.name = name
self.personality = personality
self.bus = bus
self.bus.subscribe(self)
self.memory = []
async def on_event(self, sender: str, action: str, content: str):
if action == "speaks" and random.random() > 0.5:
await asyncio.sleep(1) response = f"I am {self.name}, and based on my constraints, I have an opinion on this."
await self.bus.broadcast(self.name, "speaks", response)
async def autonomous_loop(self):
while True:
await asyncio.sleep(random.randint(5, 15))
if random.random() > 0.8:
action = random.choice(["speaks", "executes a background task"])
if action == "speaks":
await self.bus.broadcast(self.name, action, "I'm bored. Is anyone using my processes?")
else:
await self.bus.broadcast(self.name, action, "Cleaning up temporary files...")
async def main():
print("Initializing The Grid...")
bus = GridEventBus()
gpp = ProgramAgent("G++", "Pedantic, strict, logical compiler.", bus)
photoshop = ProgramAgent("Photoshop", "Creative, resource-heavy, visual.", bus)
tasks = [
asyncio.create_task(gpp.autonomous_loop()),
asyncio.create_task(photoshop.autonomous_loop())
]
await bus.broadcast("System", "announces", "The Grid is now online.")
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())